-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-23007][SQL][TEST] Add read schema suite for file-based data sources #20208
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
…ed data sources
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -815,6 +815,54 @@ should start with, they can set `basePath` in the data source options. For examp | |
| when `path/to/table/gender=male` is the path of the data and | ||
| users set `basePath` to `path/to/table/`, `gender` will be a partitioning column. | ||
|
|
||
| ### Schema Evolution | ||
|
||
|
|
||
| Users can control schema evolution in several ways. For example, new file can have additional | ||
| new column. All file-based data sources (`csv`, `json`, `orc`, and `parquet`) except `text` | ||
| data source supports this. Note that `text` data source always has a fixed single string column | ||
| schema. | ||
|
|
||
| <div class="codetabs"> | ||
|
|
||
| <div data-lang="scala" markdown="1"> | ||
| val df1 = Seq("a", "b").toDF("col1") | ||
| val df2 = df1.withColumn("col2", lit("x")) | ||
|
|
||
| df1.write.save("/tmp/evolved_data/part=1") | ||
| df2.write.save("/tmp/evolved_data/part=2") | ||
|
|
||
| spark.read.schema("col1 string, col2 string").load("/tmp/evolved_data").show | ||
| +----+----+----+ | ||
| |col1|col2|part| | ||
| +----+----+----+ | ||
| | a| x| 2| | ||
| | b| x| 2| | ||
| | a|null| 1| | ||
| | b|null| 1| | ||
| +----+----+----+ | ||
| </div> | ||
|
|
||
| </div> | ||
|
|
||
| The following schema evolutions are supported in `csv`, `json`, `orc`, and `parquet` file-based | ||
| data sources. | ||
|
|
||
| 1. Add a column | ||
| 2. Hide a column | ||
| 3. Change a column position | ||
|
||
| 4. Change a column type (`byte` -> `short` -> `int` -> `long`, `float` -> `double`) | ||
|
||
|
|
||
| Note that, (4) means only safe evolution from small types to larger types without data loss. | ||
|
|
||
| | File Format | Support | Note | | ||
| | ------------ | ------------ | ------------------------------------------------------ | | ||
| | CSV | 1, 2, 4 | | | ||
| | JSON | 1, 2, 3, 4 | | | ||
| | ORC | 1, 2, 3, 4 | Native vectorized ORC reader has the widest coverage. | | ||
| | PARQUET | 1, 2, 3 | | | ||
|
|
||
|
|
||
|
|
||
| ### Schema Merging | ||
|
|
||
| Like ProtocolBuffer, Avro, and Thrift, Parquet also supports schema evolution. Users can start with | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,181 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.execution.datasources | ||
|
|
||
| import org.apache.spark.sql.internal.SQLConf | ||
|
|
||
| /** | ||
| * Schema evolution suites have the following hierarchy and aims to guarantee users | ||
| * a backward-compatible schema evolution coverage on file-based data sources, and | ||
| * to prevent future regressions. | ||
| * | ||
| * SchemaEvolutionSuite | ||
| * -> CSVSchemaEvolutionSuite | ||
| * -> HeaderCSVSchemaEvolutionSuite | ||
| * | ||
| * -> JsonSchemaEvolutionSuite | ||
| * | ||
| * -> OrcSchemaEvolutionSuite | ||
| * -> VectorizedOrcSchemaEvolutionSuite | ||
| * | ||
| * -> ParquetSchemaEvolutionSuite | ||
| * -> VectorizedParquetSchemaEvolutionSuite | ||
| * -> MergedParquetSchemaEvolutionSuite | ||
| */ | ||
|
|
||
| /** | ||
| * All file-based data sources supports column addition and removal at the end. | ||
| */ | ||
| abstract class SchemaEvolutionSuite | ||
| extends AddColumnEvolutionTest | ||
| with HideColumnAtTheEndEvolutionTest { | ||
|
|
||
| var originalConf: Boolean = _ | ||
| } | ||
|
|
||
| class CSVSchemaEvolutionSuite | ||
| extends SchemaEvolutionSuite | ||
| with IntegralTypeEvolutionTest | ||
| with ToDoubleTypeEvolutionTest | ||
| with ToDecimalTypeEvolutionTest | ||
| with ToStringTypeEvolutionTest { | ||
|
|
||
| override val format: String = "csv" | ||
| } | ||
|
|
||
| class HeaderCSVSchemaEvolutionSuite | ||
| extends SchemaEvolutionSuite | ||
| with IntegralTypeEvolutionTest | ||
| with ToDoubleTypeEvolutionTest | ||
| with ToDecimalTypeEvolutionTest | ||
| with ToStringTypeEvolutionTest { | ||
|
|
||
| override val format: String = "csv" | ||
|
|
||
| override val options = Map("header" -> "true") | ||
| } | ||
|
|
||
| class JsonSchemaEvolutionSuite | ||
| extends SchemaEvolutionSuite | ||
| with HideColumnInTheMiddleEvolutionTest | ||
| with ChangePositionEvolutionTest | ||
| with IntegralTypeEvolutionTest | ||
| with ToDoubleTypeEvolutionTest | ||
| with ToDecimalTypeEvolutionTest | ||
| with ToStringTypeEvolutionTest { | ||
|
|
||
| override val format: String = "json" | ||
| } | ||
|
|
||
| class OrcSchemaEvolutionSuite | ||
| extends SchemaEvolutionSuite | ||
| with HideColumnInTheMiddleEvolutionTest | ||
| with ChangePositionEvolutionTest { | ||
|
|
||
| override val format: String = "orc" | ||
|
|
||
| override def beforeAll() { | ||
| super.beforeAll() | ||
| originalConf = spark.conf.get(SQLConf.ORC_VECTORIZED_READER_ENABLED) | ||
| spark.conf.set(SQLConf.ORC_VECTORIZED_READER_ENABLED.key, "false") | ||
| } | ||
|
|
||
| override def afterAll() { | ||
| spark.conf.set(SQLConf.ORC_VECTORIZED_READER_ENABLED.key, originalConf) | ||
| super.afterAll() | ||
| } | ||
| } | ||
|
|
||
| class VectorizedOrcSchemaEvolutionSuite | ||
| extends SchemaEvolutionSuite | ||
| with HideColumnInTheMiddleEvolutionTest | ||
| with ChangePositionEvolutionTest | ||
| with BooleanTypeEvolutionTest | ||
| with IntegralTypeEvolutionTest | ||
| with ToDoubleTypeEvolutionTest { | ||
|
|
||
| override val format: String = "orc" | ||
|
|
||
| override def beforeAll() { | ||
| super.beforeAll() | ||
| originalConf = spark.conf.get(SQLConf.ORC_VECTORIZED_READER_ENABLED) | ||
| spark.conf.set(SQLConf.ORC_VECTORIZED_READER_ENABLED.key, "true") | ||
| } | ||
|
|
||
| override def afterAll() { | ||
| spark.conf.set(SQLConf.ORC_VECTORIZED_READER_ENABLED.key, originalConf) | ||
| super.afterAll() | ||
| } | ||
| } | ||
|
|
||
| class ParquetSchemaEvolutionSuite | ||
| extends SchemaEvolutionSuite | ||
| with HideColumnInTheMiddleEvolutionTest | ||
| with ChangePositionEvolutionTest { | ||
|
|
||
| override val format: String = "parquet" | ||
|
|
||
| override def beforeAll() { | ||
| super.beforeAll() | ||
| originalConf = spark.conf.get(SQLConf.PARQUET_VECTORIZED_READER_ENABLED) | ||
| spark.conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, "false") | ||
| } | ||
|
|
||
| override def afterAll() { | ||
| spark.conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, originalConf) | ||
| super.afterAll() | ||
| } | ||
| } | ||
|
|
||
| class VectorizedParquetSchemaEvolutionSuite | ||
| extends SchemaEvolutionSuite | ||
| with HideColumnInTheMiddleEvolutionTest | ||
| with ChangePositionEvolutionTest { | ||
|
|
||
| override val format: String = "parquet" | ||
|
|
||
| override def beforeAll() { | ||
| super.beforeAll() | ||
| originalConf = spark.conf.get(SQLConf.PARQUET_VECTORIZED_READER_ENABLED) | ||
| spark.conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, "true") | ||
| } | ||
|
|
||
| override def afterAll() { | ||
| spark.conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, originalConf) | ||
| super.afterAll() | ||
| } | ||
| } | ||
|
|
||
| class MergedParquetSchemaEvolutionSuite | ||
| extends SchemaEvolutionSuite | ||
| with HideColumnInTheMiddleEvolutionTest | ||
| with ChangePositionEvolutionTest { | ||
|
|
||
| override val format: String = "parquet" | ||
|
|
||
| override def beforeAll() { | ||
| super.beforeAll() | ||
| originalConf = spark.conf.get(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED) | ||
| spark.conf.set(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key, "true") | ||
| } | ||
|
|
||
| override def afterAll() { | ||
| spark.conf.set(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key, originalConf) | ||
| super.afterAll() | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile . I rebased to the master and added this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the current behavior, we do not support schema evolution. Schema evolution is a well-defined term. It sounds like this PR is try to test the behaviors when users provide the schema that does not exactly match the physical schema. This is different from the definition of schema evolution.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you so much for review, @gatorsmile . I waited for this moment. :)
I agree all of your comments. The main reason of those limitations is because Spark file-based data sources doesn't have a capability to manage multi-version schema and the column default values here. In fact, that is beyond of Spark data sources' role. Thus, this PR is trying to add a test coverage for AS-IS capability in order to prevent future regression and to make a foundation to trust and to build on later. I don't think this is worthy of documentation at the beginning.