Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,14 @@ object SQLConf {
.checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo"))
.createWithDefault("snappy")

val ORC_USE_NEW_VERSION = buildConf("spark.sql.orc.useNewVersion")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.sql.orc.impl

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem to change to it. But, since the name is given by @cloud-fan before, ping @cloud-fan .

.doc("When true, use new OrcFileFormat in sql/core module instead of the one in sql/hive. " +
"Since new OrcFileFormat uses Apache ORC library instead of ORC library Hive 1.2.1, it is " +
"more stable and faster.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiny nit: let's take out more stable ..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for review, @HyukjinKwon .
Do you mean Apache ORC library is more stable, but new OrcFileFormat is not because it's introduced newly?
Actually, that's true in the Spark's viewpoint, but new OrcFileFormat contains more bug fixes and new features too. If you allow, I want to keep this. :)

Copy link
Member

@gatorsmile gatorsmile Dec 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When native, use the native version of ORC support instead of the ORC library in Hive 1.2.1. It is by default hive prior to Spark 2.3.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

.internal()
.booleanConf
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.checkValues(Set("hive", "native"))
.createWithDefault("native")

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep.

.createWithDefault(true)

val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown")
.doc("When true, enable filter pushdown for ORC files.")
.booleanConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
"read files of Hive data source directly.")
}

val cls = DataSource.lookupDataSource(source)
val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf)
if (classOf[DataSourceV2].isAssignableFrom(cls)) {
val options = new DataSourceV2Options(extraOptions.asJava)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {

assertNotBucketed("save")

val cls = DataSource.lookupDataSource(source)
val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
if (classOf[DataSourceV2].isAssignableFrom(cls)) {
cls.newInstance() match {
case ds: WriteSupport =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -190,7 +191,7 @@ case class AlterTableAddColumnsCommand(
colsToAdd: Seq[StructField]) extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val catalogTable = verifyAlterTableAddColumn(catalog, table)
val catalogTable = verifyAlterTableAddColumn(sparkSession.sessionState.conf, catalog, table)

try {
sparkSession.catalog.uncacheTable(table.quotedString)
Expand All @@ -216,6 +217,7 @@ case class AlterTableAddColumnsCommand(
* For datasource table, it currently only supports parquet, json, csv.
*/
private def verifyAlterTableAddColumn(
conf: SQLConf,
catalog: SessionCatalog,
table: TableIdentifier): CatalogTable = {
val catalogTable = catalog.getTempViewOrPermanentTableMetadata(table)
Expand All @@ -229,7 +231,7 @@ case class AlterTableAddColumnsCommand(
}

if (DDLUtils.isDatasourceTable(catalogTable)) {
DataSource.lookupDataSource(catalogTable.provider.get).newInstance() match {
DataSource.lookupDataSource(catalogTable.provider.get, conf).newInstance() match {
// For datasource table, this command can only support the following File format.
// TextFileFormat only default to one column "value"
// Hive type is already considered as hive serde table, so the logic will not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{CalendarIntervalType, StructType}
Expand Down Expand Up @@ -85,7 +87,8 @@ case class DataSource(

case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String])

lazy val providingClass: Class[_] = DataSource.lookupDataSource(className)
lazy val providingClass: Class[_] =
DataSource.lookupDataSource(className, sparkSession.sessionState.conf)
lazy val sourceInfo: SourceInfo = sourceSchema()
private val caseInsensitiveOptions = CaseInsensitiveMap(options)
private val equality = sparkSession.sessionState.conf.resolver
Expand Down Expand Up @@ -537,6 +540,7 @@ object DataSource extends Logging {
val csv = classOf[CSVFileFormat].getCanonicalName
val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat"
val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat"
val newOrc = classOf[OrcFileFormat].getCanonicalName
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not use the name like newXYZ. When the newer one was added, the name will be confusing.

How about nativeOrc?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. It sounds better.


Map(
"org.apache.spark.sql.jdbc" -> jdbc,
Expand All @@ -553,6 +557,8 @@ object DataSource extends Logging {
"org.apache.spark.sql.execution.datasources.parquet.DefaultSource" -> parquet,
"org.apache.spark.sql.hive.orc.DefaultSource" -> orc,
"org.apache.spark.sql.hive.orc" -> orc,
"org.apache.spark.sql.execution.datasources.orc.DefaultSource" -> newOrc,
"org.apache.spark.sql.execution.datasources.orc" -> newOrc,
"org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm,
"org.apache.spark.ml.source.libsvm" -> libsvm,
"com.databricks.spark.csv" -> csv
Expand All @@ -568,8 +574,12 @@ object DataSource extends Logging {
"org.apache.spark.Logging")

/** Given a provider name, look up the data source class definition. */
def lookupDataSource(provider: String): Class[_] = {
val provider1 = backwardCompatibilityMap.getOrElse(provider, provider)
def lookupDataSource(provider: String, conf: SQLConf): Class[_] = {
val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) match {
case name if name.equalsIgnoreCase("orc") && conf.getConf(SQLConf.ORC_USE_NEW_VERSION) =>
classOf[OrcFileFormat].getCanonicalName
case name => name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if ORC_IMPLEMENTATION is hive, we leave the provider as it was, which may be orc. Then we will hit Multiple sources found issue, aren't we? Both the old and new orc has the same short name orc.

Copy link
Member

@HyukjinKwon HyukjinKwon Dec 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was looking at the exact same path. It seems not because it's not registered to ServiceLoader (src/main/resources/org.apache.spark.sql.sources.DataSourceRegister). So, short name for the newer ORC source would not be used here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan . To avoid that issue, new OrcFileFormat is not registered intentionally.
@HyukjinKwon 's comment is correct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds counter-intuitive, I think we should register the new orc instead of the old one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and also add comments here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for ^

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with both of you.

Just for explanation: The original design completely preserves the previous behavior.
Without SQLConf.ORC_IMPLEMENTATION option, Spark doesn't know OrcFileFormat. So, in case of non-Hive support, creating data source with "orc" will fail with unknown data source.

Anyway, I'm happy to update according to your advice. :)

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Dec 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, there is no more The ORC data source must be used with Hive support enabled.
If hive impl is request in sql/core, it will show more proper messages.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And for here, I added the following to prevent Multiple sources found. Last time, I missed this way. My bad.

+      case name if name.equalsIgnoreCase("orc") &&
+        conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" =>
+        "org.apache.spark.sql.hive.orc.OrcFileFormat"

}
val provider2 = s"$provider1.DefaultSource"
val loader = Utils.getContextOrSparkClassLoader
val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,9 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
}

// Check if the specified data source match the data source of the existing table.
val existingProvider = DataSource.lookupDataSource(existingTable.provider.get)
val specifiedProvider = DataSource.lookupDataSource(tableDesc.provider.get)
val conf = sparkSession.sessionState.conf
val existingProvider = DataSource.lookupDataSource(existingTable.provider.get, conf)
val specifiedProvider = DataSource.lookupDataSource(tableDesc.provider.get, conf)
// TODO: Check that options from the resolved relation match the relation that we are
// inserting into (i.e. using the same compression).
if (existingProvider != specifiedProvider) {
Expand Down
21 changes: 21 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.execution.aggregate
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -2782,4 +2784,23 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
assert(spark.read.format(orc).load(path).collect().length == 2)
}
}

test("SPARK-20728 Make ORCFileFormat configurable between sql/hive and sql/core") {
withSQLConf(SQLConf.ORC_USE_NEW_VERSION.key -> "false") {
val e = intercept[AnalysisException] {
sql("CREATE TABLE spark_20728(a INT) USING ORC")
}
assert(e.message.contains("The ORC data source must be used with Hive support enabled"))
}

withSQLConf(SQLConf.ORC_USE_NEW_VERSION.key -> "true") {
withTable("spark_20728") {
sql("CREATE TABLE spark_20728(a INT) USING ORC")
val fileFormat = sql("SELECT * FROM spark_20728").queryExecution.analyzed.collectFirst {
case l: LogicalRelation => l.relation.asInstanceOf[HadoopFsRelation].fileFormat.getClass
}
assert(fileFormat == Some(classOf[OrcFileFormat]))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,6 @@ class DDLSourceLoadSuite extends DataSourceTest with SharedSQLContext {
assert(spark.read.format("org.apache.spark.sql.sources.FakeSourceOne")
.load().schema == StructType(Seq(StructField("stringType", StringType, nullable = false))))
}

test("should fail to load ORC without Hive Support") {
val e = intercept[AnalysisException] {
spark.read.format("orc").load()
}
assert(e.message.contains("The ORC data source must be used with Hive support enabled"))
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
}
}


test("resolve default source") {
spark.read
.format("org.apache.spark.sql.test")
Expand Down Expand Up @@ -478,42 +477,56 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
spark.read.schema(userSchema).parquet(Seq(dir, dir): _*), expData ++ expData, userSchema)
}

/**
* This only tests whether API compiles, but does not run it as orc()
* cannot be run without Hive classes.
*/
ignore("orc - API") {
// Reader, with user specified schema
// Refer to csv-specific test suites for behavior without user specified schema
spark.read.schema(userSchema).orc()
spark.read.schema(userSchema).orc(dir)
spark.read.schema(userSchema).orc(dir, dir, dir)
spark.read.schema(userSchema).orc(Seq(dir, dir): _*)
Option(dir).map(spark.read.schema(userSchema).orc)
test("orc - API and behavior regarding schema") {
withSQLConf(SQLConf.ORC_USE_NEW_VERSION.key -> "true") {
// Writer
spark.createDataset(data).toDF("str").write.mode(SaveMode.Overwrite).orc(dir)
val df = spark.read.orc(dir)
checkAnswer(df, spark.createDataset(data).toDF())
val schema = df.schema

// Writer
spark.range(10).write.orc(dir)
// Reader, without user specified schema
intercept[AnalysisException] {
testRead(spark.read.orc(), Seq.empty, schema)
}
testRead(spark.read.orc(dir), data, schema)
testRead(spark.read.orc(dir, dir), data ++ data, schema)
testRead(spark.read.orc(Seq(dir, dir): _*), data ++ data, schema)
// Test explicit calls to single arg method - SPARK-16009
testRead(Option(dir).map(spark.read.orc).get, data, schema)

// Reader, with user specified schema, data should be nulls as schema in file different
// from user schema
val expData = Seq[String](null, null, null)
testRead(spark.read.schema(userSchema).orc(), Seq.empty, userSchema)
testRead(spark.read.schema(userSchema).orc(dir), expData, userSchema)
testRead(spark.read.schema(userSchema).orc(dir, dir), expData ++ expData, userSchema)
testRead(
spark.read.schema(userSchema).orc(Seq(dir, dir): _*), expData ++ expData, userSchema)
}
}

test("column nullability and comment - write and then read") {
Seq("json", "parquet", "csv").foreach { format =>
val schema = StructType(
StructField("cl1", IntegerType, nullable = false).withComment("test") ::
StructField("cl2", IntegerType, nullable = true) ::
StructField("cl3", IntegerType, nullable = true) :: Nil)
val row = Row(3, null, 4)
val df = spark.createDataFrame(sparkContext.parallelize(row :: Nil), schema)

val tableName = "tab"
withTable(tableName) {
df.write.format(format).mode("overwrite").saveAsTable(tableName)
// Verify the DDL command result: DESCRIBE TABLE
checkAnswer(
sql(s"desc $tableName").select("col_name", "comment").where($"comment" === "test"),
Row("cl1", "test") :: Nil)
// Verify the schema
val expectedFields = schema.fields.map(f => f.copy(nullable = true))
assert(spark.table(tableName).schema == schema.copy(fields = expectedFields))
withSQLConf(SQLConf.ORC_USE_NEW_VERSION.key -> "true") {
Seq("json", "orc", "parquet", "csv").foreach { format =>
val schema = StructType(
StructField("cl1", IntegerType, nullable = false).withComment("test") ::
StructField("cl2", IntegerType, nullable = true) ::
StructField("cl3", IntegerType, nullable = true) :: Nil)
val row = Row(3, null, 4)
val df = spark.createDataFrame(sparkContext.parallelize(row :: Nil), schema)

val tableName = "tab"
withTable(tableName) {
df.write.format(format).mode("overwrite").saveAsTable(tableName)
// Verify the DDL command result: DESCRIBE TABLE
checkAnswer(
sql(s"desc $tableName").select("col_name", "comment").where($"comment" === "test"),
Row("cl1", "test") :: Nil)
// Verify the schema
val expectedFields = schema.fields.map(f => f.copy(nullable = true))
assert(spark.table(tableName).schema == schema.copy(fields = expectedFields))
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation}
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.hive.orc.OrcFileFormat
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}


Expand Down Expand Up @@ -195,8 +194,19 @@ case class RelationConversions(
.convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet")
} else {
val options = relation.tableMeta.storage.properties
sessionCatalog.metastoreCatalog
.convertToLogicalRelation(relation, options, classOf[OrcFileFormat], "orc")
if (conf.getConf(SQLConf.ORC_USE_NEW_VERSION)) {
sessionCatalog.metastoreCatalog.convertToLogicalRelation(
relation,
options,
classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat],
"orc")
} else {
sessionCatalog.metastoreCatalog.convertToLogicalRelation(
relation,
options,
classOf[org.apache.spark.sql.hive.orc.OrcFileFormat],
"orc")
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.execution.datasources.{LogicalRelation, RecordReaderIterator}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, RecordReaderIterator}
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
Expand Down Expand Up @@ -621,4 +621,21 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
makeOrcFile((1 to 10).map(Tuple1.apply), path2)
assertResult(20)(read.orc(path1.getCanonicalPath, path2.getCanonicalPath).count())
}

test("SPARK-20728 Make ORCFileFormat configurable between sql/hive and sql/core") {
Seq(
(true, classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat]),
(false, classOf[org.apache.spark.sql.hive.orc.OrcFileFormat])).foreach { case (v, format) =>

withSQLConf(SQLConf.ORC_USE_NEW_VERSION.key -> s"$v") {
withTable("spark_20728") {
sql("CREATE TABLE spark_20728(a INT) USING ORC")
val fileFormat = sql("SELECT * FROM spark_20728").queryExecution.analyzed.collectFirst {
case l: LogicalRelation => l.relation.asInstanceOf[HadoopFsRelation].fileFormat.getClass
}
assert(fileFormat == Some(format))
}
}
}
}
}