From 2d74b204b85db1ffcfb164a160e8f6f0d02d3f4b Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 14 Feb 2018 10:25:19 -0800 Subject: [PATCH 1/7] Use 'hive' for ORC --- docs/sql-programming-guide.md | 2 +- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 0f9f01e18682..e84523f2f931 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1784,7 +1784,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see Property NameDefaultMeaning spark.sql.orc.impl - native + hive The name of ORC implementation. It can be one of native and hive. native means the native ORC support that is built on Apache ORC 1.4.1. `hive` means the ORC library in Hive 1.2.1 which is used prior to Spark 2.3. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 7835dbaa5843..e910f8f68c4b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -399,11 +399,11 @@ object SQLConf { val ORC_IMPLEMENTATION = buildConf("spark.sql.orc.impl") .doc("When native, use the native version of ORC support instead of the ORC library in Hive " + - "1.2.1. It is 'hive' by default prior to Spark 2.3.") + "1.2.1. It is 'hive' by default.") .internal() .stringConf .checkValues(Set("hive", "native")) - .createWithDefault("native") + .createWithDefault("hive") val ORC_VECTORIZED_READER_ENABLED = buildConf("spark.sql.orc.enableVectorizedReader") .doc("Enables vectorized orc decoding.") From 93e6c7d9b3de383c83d82ae7e0c07ed17233bc27 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 14 Feb 2018 10:33:35 -0800 Subject: [PATCH 2/7] Disable PPD back. --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index e910f8f68c4b..f24fd7ff74d3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -426,7 +426,7 @@ object SQLConf { val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown") .doc("When true, enable filter pushdown for ORC files.") .booleanConf - .createWithDefault(true) + .createWithDefault(false) val HIVE_VERIFY_PARTITION_PATH = buildConf("spark.sql.hive.verifyPartitionPath") .doc("When true, check all the partition paths under the table\'s root directory " + From 7ff4ccf115437dadcda761890522a38960c5fde6 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 14 Feb 2018 10:38:55 -0800 Subject: [PATCH 3/7] Add ORC section. --- docs/sql-programming-guide.md | 47 ++++++++++++++--------------------- 1 file changed, 18 insertions(+), 29 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index e84523f2f931..54f71e4410aa 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1004,6 +1004,24 @@ Configuration of Parquet can be done using the `setConf` method on `SparkSession +## ORC Files + +Since Spark 2.3, Spark supports a vectorized ORC reader with a new ORC file format for ORC files. To do that, the following configurations are newly added. The vectorized reader is used for the native ORC tables (e.g., the ones created using the clause `USING ORC`) when `spark.sql.orc.impl` is set to `native` and `spark.sql.orc.enableVectorizedReader` is set to `true`. For the Hive ORC serde table (e.g., the ones created using the clause `USING HIVE OPTIONS (fileFormat 'ORC')`), the vectorized reader is used when `spark.sql.hive.convertMetastoreOrc` is set to `true`. + + + + + + + + + + + + + +
Property NameDefaultMeaning
spark.sql.orc.implhiveThe name of ORC implementation. It can be one of native and hive. native means the native ORC support that is built on Apache ORC 1.4.1. `hive` means the ORC library in Hive 1.2.1 which is used prior to Spark 2.3.
spark.sql.orc.enableVectorizedReadertrueEnables vectorized orc decoding in native implementation. If false, a new non-vectorized ORC reader is used in native implementation. For hive implementation, this is ignored.
+ ## JSON Datasets
@@ -1776,35 +1794,6 @@ working with timestamps in `pandas_udf`s to get the best performance, see ## Upgrading From Spark SQL 2.2 to 2.3 - - Since Spark 2.3, Spark supports a vectorized ORC reader with a new ORC file format for ORC files. To do that, the following configurations are newly added or change their default values. The vectorized reader is used for the native ORC tables (e.g., the ones created using the clause `USING ORC`) when `spark.sql.orc.impl` is set to `native` and `spark.sql.orc.enableVectorizedReader` is set to `true`. For the Hive ORC serde table (e.g., the ones created using the clause `USING HIVE OPTIONS (fileFormat 'ORC')`), the vectorized reader is used when `spark.sql.hive.convertMetastoreOrc` is set to `true`. - - - New configurations - - - - - - - - - - - - - -
Property NameDefaultMeaning
spark.sql.orc.implhiveThe name of ORC implementation. It can be one of native and hive. native means the native ORC support that is built on Apache ORC 1.4.1. `hive` means the ORC library in Hive 1.2.1 which is used prior to Spark 2.3.
spark.sql.orc.enableVectorizedReadertrueEnables vectorized orc decoding in native implementation. If false, a new non-vectorized ORC reader is used in native implementation. For hive implementation, this is ignored.
- - - Changed configurations - - - - - - - - -
Property NameDefaultMeaning
spark.sql.orc.filterPushdowntrueEnables filter pushdown for ORC files. It is false by default prior to Spark 2.3.
- - Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column (named `_corrupt_record` by default). For example, `spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()` and `spark.read.schema(schema).json(file).select("_corrupt_record").show()`. Instead, you can cache or save the parsed results and then send the same query. For example, `val df = spark.read.schema(schema).json(file).cache()` and then `df.filter($"_corrupt_record".isNotNull).count()`. - The `percentile_approx` function previously accepted numeric type input and output double type results. Now it supports date type, timestamp type and numeric types as input types. The result type is also changed to be the same as the input type, which is more reasonable for percentiles. - Since Spark 2.3, the Join/Filter's deterministic predicates that are after the first non-deterministic predicates are also pushed down/through the child operators, if possible. In prior Spark versions, these filters are not eligible for predicate pushdown. From 46c8697b1981f57eeacb48bea31dec1e89f4e66a Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 14 Feb 2018 13:49:24 -0800 Subject: [PATCH 4/7] fix test case. --- .../spark/sql/FileBasedDataSourceSuite.scala | 14 +++++++++++++- .../spark/sql/streaming/FileStreamSinkSuite.scala | 10 ++++++++++ .../sql/streaming/FileStreamSourceSuite.scala | 10 ++++++++++ 3 files changed, 33 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 2e332362ea64..4ccd681d99a3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -20,14 +20,26 @@ package org.apache.spark.sql import java.io.FileNotFoundException import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkException import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext -class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext { + +class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with BeforeAndAfterAll { import testImplicits._ + override def beforeAll(): Unit = { + super.beforeAll() + spark.sessionState.conf.setConf(SQLConf.ORC_IMPLEMENTATION, "native") + } + + override def afterAll(): Unit = { + spark.sessionState.conf.unsetConf(SQLConf.ORC_IMPLEMENTATION) + super.afterAll() + } + private val allFileBasedDataSources = Seq("orc", "parquet", "csv", "json", "text") private val nameWithSpecialChars = "sp&cial%c hars" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 8c4e1fd00b0a..5a3857208ecf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -33,6 +33,16 @@ import org.apache.spark.util.Utils class FileStreamSinkSuite extends StreamTest { import testImplicits._ + override def beforeAll(): Unit = { + super.beforeAll() + spark.sessionState.conf.setConf(SQLConf.ORC_IMPLEMENTATION, "native") + } + + override def afterAll(): Unit = { + spark.sessionState.conf.unsetConf(SQLConf.ORC_IMPLEMENTATION) + super.afterAll() + } + test("unpartitioned writing and batch reading") { val inputData = MemoryStream[Int] val df = inputData.toDF() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 5bb0f4d643bb..028b7bbf9a64 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -207,6 +207,16 @@ class FileStreamSourceSuite extends FileStreamSourceTest { .collect { case s @ StreamingRelation(dataSource, _, _) => s.schema }.head } + override def beforeAll(): Unit = { + super.beforeAll() + spark.sessionState.conf.setConf(SQLConf.ORC_IMPLEMENTATION, "native") + } + + override def afterAll(): Unit = { + spark.sessionState.conf.unsetConf(SQLConf.ORC_IMPLEMENTATION) + super.afterAll() + } + // ============= Basic parameter exists tests ================ test("FileStreamSource schema: no path") { From 276963369b663c99aebcde10f4329f479f43b3ea Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 14 Feb 2018 16:35:53 -0800 Subject: [PATCH 5/7] Address comment. --- docs/sql-programming-guide.md | 7 ++++++- .../org/apache/spark/sql/FileBasedDataSourceSuite.scala | 7 +++++-- .../apache/spark/sql/streaming/FileStreamSinkSuite.scala | 7 +++++-- .../apache/spark/sql/streaming/FileStreamSourceSuite.scala | 7 +++++-- 4 files changed, 21 insertions(+), 7 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 54f71e4410aa..37c08f4029dd 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1006,7 +1006,12 @@ Configuration of Parquet can be done using the `setConf` method on `SparkSession ## ORC Files -Since Spark 2.3, Spark supports a vectorized ORC reader with a new ORC file format for ORC files. To do that, the following configurations are newly added. The vectorized reader is used for the native ORC tables (e.g., the ones created using the clause `USING ORC`) when `spark.sql.orc.impl` is set to `native` and `spark.sql.orc.enableVectorizedReader` is set to `true`. For the Hive ORC serde table (e.g., the ones created using the clause `USING HIVE OPTIONS (fileFormat 'ORC')`), the vectorized reader is used when `spark.sql.hive.convertMetastoreOrc` is set to `true`. +Since Spark 2.3, Spark supports a vectorized ORC reader with a new ORC file format for ORC files. +To do that, the following configurations are newly added. The vectorized reader is used for the +native ORC tables (e.g., the ones created using the clause `USING ORC`) when `spark.sql.orc.impl` +is set to `native` and `spark.sql.orc.enableVectorizedReader` is set to `true`. For the Hive ORC +serde tables (e.g., the ones created using the clause `USING HIVE OPTIONS (fileFormat 'ORC')`), +the vectorized reader is used when `spark.sql.hive.convertMetastoreOrc` is set to `true`. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 4ccd681d99a3..b5d4c558f0d3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -36,8 +36,11 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo } override def afterAll(): Unit = { - spark.sessionState.conf.unsetConf(SQLConf.ORC_IMPLEMENTATION) - super.afterAll() + try { + spark.sessionState.conf.unsetConf(SQLConf.ORC_IMPLEMENTATION) + } finally { + super.afterAll() + } } private val allFileBasedDataSources = Seq("orc", "parquet", "csv", "json", "text") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 5a3857208ecf..ba48bc1ce0c4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -39,8 +39,11 @@ class FileStreamSinkSuite extends StreamTest { } override def afterAll(): Unit = { - spark.sessionState.conf.unsetConf(SQLConf.ORC_IMPLEMENTATION) - super.afterAll() + try { + spark.sessionState.conf.unsetConf(SQLConf.ORC_IMPLEMENTATION) + } finally { + super.afterAll() + } } test("unpartitioned writing and batch reading") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 028b7bbf9a64..d4bd9c7987f2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -213,8 +213,11 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } override def afterAll(): Unit = { - spark.sessionState.conf.unsetConf(SQLConf.ORC_IMPLEMENTATION) - super.afterAll() + try { + spark.sessionState.conf.unsetConf(SQLConf.ORC_IMPLEMENTATION) + } finally { + super.afterAll() + } } // ============= Basic parameter exists tests ================ From 183ec213b02ad528cb016e67ecc2bfb6394668f1 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 14 Feb 2018 19:05:15 -0800 Subject: [PATCH 6/7] Add `also`. --- docs/sql-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 37c08f4029dd..648d54d83fa2 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1011,7 +1011,7 @@ To do that, the following configurations are newly added. The vectorized reader native ORC tables (e.g., the ones created using the clause `USING ORC`) when `spark.sql.orc.impl` is set to `native` and `spark.sql.orc.enableVectorizedReader` is set to `true`. For the Hive ORC serde tables (e.g., the ones created using the clause `USING HIVE OPTIONS (fileFormat 'ORC')`), -the vectorized reader is used when `spark.sql.hive.convertMetastoreOrc` is set to `true`. +the vectorized reader is used when `spark.sql.hive.convertMetastoreOrc` is also set to `true`.
Property NameDefaultMeaning
From 19b50b1eb5dcdf02ecd515b5d27d0256c7f4a3ab Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Thu, 15 Feb 2018 00:04:44 -0800 Subject: [PATCH 7/7] Remove. --- docs/sql-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 648d54d83fa2..ea57a0c7dec0 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1018,7 +1018,7 @@ the vectorized reader is used when `spark.sql.hive.convertMetastoreOrc` is also - +
Property NameDefaultMeaning
spark.sql.orc.impl hiveThe name of ORC implementation. It can be one of native and hive. native means the native ORC support that is built on Apache ORC 1.4.1. `hive` means the ORC library in Hive 1.2.1 which is used prior to Spark 2.3.The name of ORC implementation. It can be one of native and hive. native means the native ORC support that is built on Apache ORC 1.4.1. `hive` means the ORC library in Hive 1.2.1.
spark.sql.orc.enableVectorizedReader