From ebd239eab0aa2b03b211cd470eb33d5a538f594a Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sat, 6 Jan 2018 16:02:09 -0800 Subject: [PATCH 1/3] [SPARK-23007][SQL][TEST] Add schema evolution test suite for file-based data sources --- docs/sql-programming-guide.md | 48 ++ .../datasources/SchemaEvolutionSuite.scala | 181 +++++++ .../datasources/SchemaEvolutionTest.scala | 492 ++++++++++++++++++ 3 files changed, 721 insertions(+) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaEvolutionSuite.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaEvolutionTest.scala diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index ad23dae7c6b7..ff8540c616a4 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -815,6 +815,54 @@ should start with, they can set `basePath` in the data source options. For examp when `path/to/table/gender=male` is the path of the data and users set `basePath` to `path/to/table/`, `gender` will be a partitioning column. +### Schema Evolution + +Users can control schema evolution in several ways. For example, new file can have additional +new column. All file-based data sources (`csv`, `json`, `orc`, and `parquet`) except `text` +data source supports this. Note that `text` data source always has a fixed single string column +schema. + +
+ +
+val df1 = Seq("a", "b").toDF("col1") +val df2 = df1.withColumn("col2", lit("x")) + +df1.write.save("/tmp/evolved_data/part=1") +df2.write.save("/tmp/evolved_data/part=2") + +spark.read.schema("col1 string, col2 string").load("/tmp/evolved_data").show ++----+----+----+ +|col1|col2|part| ++----+----+----+ +| a| x| 2| +| b| x| 2| +| a|null| 1| +| b|null| 1| ++----+----+----+ +
+ +
+ +The following schema evolutions are supported in `csv`, `json`, `orc`, and `parquet` file-based +data sources. + + 1. Add a column + 2. Hide a column + 3. Change a column position + 4. Change a column type (`byte` -> `short` -> `int` -> `long`, `float` -> `double`) + +Note that, (4) means only safe evolution from small types to larger types without data loss. + + | File Format | Support | Note | + | ------------ | ------------ | ------------------------------------------------------ | + | CSV | 1, 2, 4 | | + | JSON | 1, 2, 3, 4 | | + | ORC | 1, 2, 3, 4 | Native vectorized ORC reader has the widest coverage. | + | PARQUET | 1, 2, 3 | | + + + ### Schema Merging Like ProtocolBuffer, Avro, and Thrift, Parquet also supports schema evolution. Users can start with diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaEvolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaEvolutionSuite.scala new file mode 100644 index 000000000000..6e94c8822c2b --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaEvolutionSuite.scala @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.sql.internal.SQLConf + +/** + * Schema evolution suites have the following hierarchy and aims to guarantee users + * a backward-compatible schema evolution coverage on file-based data sources, and + * to prevent future regressions. + * + * SchemaEvolutionSuite + * -> CSVSchemaEvolutionSuite + * -> HeaderCSVSchemaEvolutionSuite + * + * -> JsonSchemaEvolutionSuite + * + * -> OrcSchemaEvolutionSuite + * -> VectorizedOrcSchemaEvolutionSuite + * + * -> ParquetSchemaEvolutionSuite + * -> VectorizedParquetSchemaEvolutionSuite + * -> MergedParquetSchemaEvolutionSuite + */ + +/** + * All file-based data sources supports column addition and removal at the end. + */ +abstract class SchemaEvolutionSuite + extends AddColumnEvolutionTest + with HideColumnAtTheEndEvolutionTest { + + var originalConf: Boolean = _ +} + +class CSVSchemaEvolutionSuite + extends SchemaEvolutionSuite + with IntegralTypeEvolutionTest + with ToDoubleTypeEvolutionTest + with ToDecimalTypeEvolutionTest + with ToStringTypeEvolutionTest { + + override val format: String = "csv" +} + +class HeaderCSVSchemaEvolutionSuite + extends SchemaEvolutionSuite + with IntegralTypeEvolutionTest + with ToDoubleTypeEvolutionTest + with ToDecimalTypeEvolutionTest + with ToStringTypeEvolutionTest { + + override val format: String = "csv" + + override val options = Map("header" -> "true") +} + +class JsonSchemaEvolutionSuite + extends SchemaEvolutionSuite + with HideColumnInTheMiddleEvolutionTest + with ChangePositionEvolutionTest + with IntegralTypeEvolutionTest + with ToDoubleTypeEvolutionTest + with ToDecimalTypeEvolutionTest + with ToStringTypeEvolutionTest { + + override val format: String = "json" +} + +class OrcSchemaEvolutionSuite + extends SchemaEvolutionSuite + with HideColumnInTheMiddleEvolutionTest + with ChangePositionEvolutionTest { + + override val format: String = "orc" + + override def beforeAll() { + super.beforeAll() + originalConf = spark.conf.get(SQLConf.ORC_VECTORIZED_READER_ENABLED) + spark.conf.set(SQLConf.ORC_VECTORIZED_READER_ENABLED.key, "false") + } + + override def afterAll() { + spark.conf.set(SQLConf.ORC_VECTORIZED_READER_ENABLED.key, originalConf) + super.afterAll() + } +} + +class VectorizedOrcSchemaEvolutionSuite + extends SchemaEvolutionSuite + with HideColumnInTheMiddleEvolutionTest + with ChangePositionEvolutionTest + with BooleanTypeEvolutionTest + with IntegralTypeEvolutionTest + with ToDoubleTypeEvolutionTest { + + override val format: String = "orc" + + override def beforeAll() { + super.beforeAll() + originalConf = spark.conf.get(SQLConf.ORC_VECTORIZED_READER_ENABLED) + spark.conf.set(SQLConf.ORC_VECTORIZED_READER_ENABLED.key, "true") + } + + override def afterAll() { + spark.conf.set(SQLConf.ORC_VECTORIZED_READER_ENABLED.key, originalConf) + super.afterAll() + } +} + +class ParquetSchemaEvolutionSuite + extends SchemaEvolutionSuite + with HideColumnInTheMiddleEvolutionTest + with ChangePositionEvolutionTest { + + override val format: String = "parquet" + + override def beforeAll() { + super.beforeAll() + originalConf = spark.conf.get(SQLConf.PARQUET_VECTORIZED_READER_ENABLED) + spark.conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, "false") + } + + override def afterAll() { + spark.conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, originalConf) + super.afterAll() + } +} + +class VectorizedParquetSchemaEvolutionSuite + extends SchemaEvolutionSuite + with HideColumnInTheMiddleEvolutionTest + with ChangePositionEvolutionTest { + + override val format: String = "parquet" + + override def beforeAll() { + super.beforeAll() + originalConf = spark.conf.get(SQLConf.PARQUET_VECTORIZED_READER_ENABLED) + spark.conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, "true") + } + + override def afterAll() { + spark.conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, originalConf) + super.afterAll() + } +} + +class MergedParquetSchemaEvolutionSuite + extends SchemaEvolutionSuite + with HideColumnInTheMiddleEvolutionTest + with ChangePositionEvolutionTest { + + override val format: String = "parquet" + + override def beforeAll() { + super.beforeAll() + originalConf = spark.conf.get(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED) + spark.conf.set(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key, "true") + } + + override def afterAll() { + spark.conf.set(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key, originalConf) + super.afterAll() + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaEvolutionTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaEvolutionTest.scala new file mode 100644 index 000000000000..9f7b95547e03 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaEvolutionTest.scala @@ -0,0 +1,492 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.File + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} + +/** + * Schema can evolve in several ways and the followings are supported in file-based data sources. + * Note that partition columns are not maintained in files. Here, `column` means non-partition + * column. + * + * 1. Add a column + * 2. Hide a column + * 3. Change a column position + * 4. Change a column type (Upcast) + * + * Here, we consider safe evolution without data loss. For example, data type evolution should be + * from small types to larger types like `int`-to-`long`, not vice versa. + * + * So far, file-based data sources have schema evolution coverages like the followings. + * + * | File Format | Coverage | Note | + * | ------------ | ------------ | ------------------------------------------------------ | + * | TEXT | N/A | Schema consists of a single string column. | + * | CSV | 1, 2, 4 | | + * | JSON | 1, 2, 3, 4 | | + * | ORC | 1, 2, 3, 4 | Native vectorized ORC reader has the widest coverage. | + * | PARQUET | 1, 2, 3 | | + * + * This aims to provide an explicit test coverage for schema evolution on file-based data sources. + * Since a file format has its own coverage of schema evolution, we need a test suite + * for each file-based data source with corresponding supported test case traits. + * + * The following is a hierarchy of test traits. + * + * SchemaEvolutionTest + * -> AddColumnEvolutionTest + * -> HideColumnEvolutionTest + * -> ChangePositionEvolutionTest + * -> BooleanTypeEvolutionTest + * -> IntegralTypeEvolutionTest + * -> ToDoubleTypeEvolutionTest + * -> ToDecimalTypeEvolutionTest + */ + +trait SchemaEvolutionTest extends QueryTest with SQLTestUtils with SharedSQLContext { + val format: String + val options: Map[String, String] = Map.empty[String, String] +} + +/** + * Add column (Case 1). + * This test suite assumes that the missing column should be `null`. + */ +trait AddColumnEvolutionTest extends SchemaEvolutionTest { + import testImplicits._ + + test("append column at the end") { + withTempPath { dir => + val path = dir.getCanonicalPath + + val df1 = Seq("a", "b").toDF("col1") + val df2 = df1.withColumn("col2", lit("x")) + val df3 = df2.withColumn("col3", lit("y")) + + val dir1 = s"$path${File.separator}part=one" + val dir2 = s"$path${File.separator}part=two" + val dir3 = s"$path${File.separator}part=three" + + df1.write.format(format).options(options).save(dir1) + df2.write.format(format).options(options).save(dir2) + df3.write.format(format).options(options).save(dir3) + + val df = spark.read + .schema(df3.schema) + .format(format) + .options(options) + .load(path) + + checkAnswer(df, Seq( + Row("a", null, null, "one"), + Row("b", null, null, "one"), + Row("a", "x", null, "two"), + Row("b", "x", null, "two"), + Row("a", "x", "y", "three"), + Row("b", "x", "y", "three"))) + } + } +} + +/** + * Hide column (Case 2-1). + */ +trait HideColumnAtTheEndEvolutionTest extends SchemaEvolutionTest { + import testImplicits._ + + test("hide column at the end") { + withTempPath { dir => + val path = dir.getCanonicalPath + + val df1 = Seq(("1", "a"), ("2", "b")).toDF("col1", "col2") + val df2 = df1.withColumn("col3", lit("y")) + + val dir1 = s"$path${File.separator}part=two" + val dir2 = s"$path${File.separator}part=three" + + df1.write.format(format).options(options).save(dir1) + df2.write.format(format).options(options).save(dir2) + + val df = spark.read + .schema(df1.schema) + .format(format) + .options(options) + .load(path) + + checkAnswer(df, Seq( + Row("1", "a", "two"), + Row("2", "b", "two"), + Row("1", "a", "three"), + Row("2", "b", "three"))) + + val df3 = spark.read + .schema("col1 string") + .format(format) + .options(options) + .load(path) + + checkAnswer(df3, Seq( + Row("1", "two"), + Row("2", "two"), + Row("1", "three"), + Row("2", "three"))) + } + } +} + +/** + * Hide column in the middle (Case 2-2). + */ +trait HideColumnInTheMiddleEvolutionTest extends SchemaEvolutionTest { + import testImplicits._ + + test("hide column in the middle") { + withTempPath { dir => + val path = dir.getCanonicalPath + + val df1 = Seq(("1", "a"), ("2", "b")).toDF("col1", "col2") + val df2 = df1.withColumn("col3", lit("y")) + + val dir1 = s"$path${File.separator}part=two" + val dir2 = s"$path${File.separator}part=three" + + df1.write.format(format).options(options).save(dir1) + df2.write.format(format).options(options).save(dir2) + + val df = spark.read + .schema("col2 string") + .format(format) + .options(options) + .load(path) + + checkAnswer(df, Seq( + Row("a", "two"), + Row("b", "two"), + Row("a", "three"), + Row("b", "three"))) + } + } +} + +/** + * Change column positions (Case 3). + * This suite assumes that all data set have the same number of columns. + */ +trait ChangePositionEvolutionTest extends SchemaEvolutionTest { + import testImplicits._ + + test("change column position") { + withTempPath { dir => + val path = dir.getCanonicalPath + + val df1 = Seq(("1", "a"), ("2", "b"), ("3", "c")).toDF("col1", "col2") + val df2 = Seq(("d", "4"), ("e", "5"), ("f", "6")).toDF("col2", "col1") + val unionDF = df1.unionByName(df2) + + val dir1 = s"$path${File.separator}part=one" + val dir2 = s"$path${File.separator}part=two" + + df1.write.format(format).options(options).save(dir1) + df2.write.format(format).options(options).save(dir2) + + val df = spark.read + .schema(unionDF.schema) + .format(format) + .options(options) + .load(path) + .select("col1", "col2") + + checkAnswer(df, unionDF) + } + } +} + +/** + * Change a column type (Case 4). + * This suite assumes that a user gives a wider schema intentionally. + */ +trait BooleanTypeEvolutionTest extends SchemaEvolutionTest { + import testImplicits._ + + test("change column type from boolean to byte/short/int/long") { + withTempPath { dir => + val path = dir.getCanonicalPath + + val values = (1 to 10).map(_ % 2) + val booleanDF = (1 to 10).map(_ % 2 == 1).toDF("col1") + val byteDF = values.map(_.toByte).toDF("col1") + val shortDF = values.map(_.toShort).toDF("col1") + val intDF = values.toDF("col1") + val longDF = values.map(_.toLong).toDF("col1") + + booleanDF.write.mode("overwrite").format(format).options(options).save(path) + + Seq( + ("col1 byte", byteDF), + ("col1 short", shortDF), + ("col1 int", intDF), + ("col1 long", longDF)).foreach { case (schema, answerDF) => + checkAnswer(spark.read.schema(schema).format(format).options(options).load(path), answerDF) + } + } + } +} + +/** + * Change a column type (Case 4). + * This suite assumes that a user gives a wider schema intentionally. + */ +trait ToStringTypeEvolutionTest extends SchemaEvolutionTest { + import testImplicits._ + + test("read as string") { + withTempPath { dir => + val path = dir.getCanonicalPath + + val byteDF = (Byte.MaxValue - 2 to Byte.MaxValue).map(_.toByte).toDF("col1") + val shortDF = (Short.MaxValue - 2 to Short.MaxValue).map(_.toShort).toDF("col1") + val intDF = (Int.MaxValue - 2 to Int.MaxValue).toDF("col1") + val longDF = (Long.MaxValue - 2 to Long.MaxValue).toDF("col1") + val unionDF = byteDF.union(shortDF).union(intDF).union(longDF) + .selectExpr("cast(col1 AS STRING) col1") + + val byteDir = s"$path${File.separator}part=byte" + val shortDir = s"$path${File.separator}part=short" + val intDir = s"$path${File.separator}part=int" + val longDir = s"$path${File.separator}part=long" + + byteDF.write.format(format).options(options).save(byteDir) + shortDF.write.format(format).options(options).save(shortDir) + intDF.write.format(format).options(options).save(intDir) + longDF.write.format(format).options(options).save(longDir) + + val df = spark.read + .schema("col1 string") + .format(format) + .options(options) + .load(path) + .select("col1") + + checkAnswer(df, unionDF) + } + } +} + +/** + * Change a column type (Case 4). + * This suite assumes that a user gives a wider schema intentionally. + */ +trait IntegralTypeEvolutionTest extends SchemaEvolutionTest { + + import testImplicits._ + + private lazy val values = 1 to 10 + private lazy val byteDF = values.map(_.toByte).toDF("col1") + private lazy val shortDF = values.map(_.toShort).toDF("col1") + private lazy val intDF = values.toDF("col1") + private lazy val longDF = values.map(_.toLong).toDF("col1") + + test("change column type from byte to short/int/long") { + withTempPath { dir => + val path = dir.getCanonicalPath + + byteDF.write.format(format).options(options).save(path) + + Seq( + ("col1 short", shortDF), + ("col1 int", intDF), + ("col1 long", longDF)).foreach { case (schema, answerDF) => + checkAnswer(spark.read.schema(schema).format(format).options(options).load(path), answerDF) + } + } + } + + test("change column type from short to int/long") { + withTempPath { dir => + val path = dir.getCanonicalPath + + shortDF.write.format(format).options(options).save(path) + + Seq(("col1 int", intDF), ("col1 long", longDF)).foreach { case (schema, answerDF) => + checkAnswer(spark.read.schema(schema).format(format).options(options).load(path), answerDF) + } + } + } + + test("change column type from int to long") { + withTempPath { dir => + val path = dir.getCanonicalPath + + intDF.write.format(format).options(options).save(path) + + Seq(("col1 long", longDF)).foreach { case (schema, answerDF) => + checkAnswer(spark.read.schema(schema).format(format).options(options).load(path), answerDF) + } + } + } + + test("read byte, int, short, long together") { + withTempPath { dir => + val path = dir.getCanonicalPath + + val byteDF = (Byte.MaxValue - 2 to Byte.MaxValue).map(_.toByte).toDF("col1") + val shortDF = (Short.MaxValue - 2 to Short.MaxValue).map(_.toShort).toDF("col1") + val intDF = (Int.MaxValue - 2 to Int.MaxValue).toDF("col1") + val longDF = (Long.MaxValue - 2 to Long.MaxValue).toDF("col1") + val unionDF = byteDF.union(shortDF).union(intDF).union(longDF) + + val byteDir = s"$path${File.separator}part=byte" + val shortDir = s"$path${File.separator}part=short" + val intDir = s"$path${File.separator}part=int" + val longDir = s"$path${File.separator}part=long" + + byteDF.write.format(format).options(options).save(byteDir) + shortDF.write.format(format).options(options).save(shortDir) + intDF.write.format(format).options(options).save(intDir) + longDF.write.format(format).options(options).save(longDir) + + val df = spark.read + .schema(unionDF.schema) + .format(format) + .options(options) + .load(path) + .select("col1") + + checkAnswer(df, unionDF) + } + } +} + +/** + * Change a column type (Case 4). + * This suite assumes that a user gives a wider schema intentionally. + */ +trait ToDoubleTypeEvolutionTest extends SchemaEvolutionTest { + import testImplicits._ + + private lazy val values = 1 to 10 + private lazy val floatDF = values.map(_.toFloat).toDF("col1") + private lazy val doubleDF = values.map(_.toDouble).toDF("col1") + private lazy val unionDF = floatDF.union(doubleDF) + + test("change column type from float to double") { + withTempPath { dir => + val path = dir.getCanonicalPath + + floatDF.write.format(format).options(options).save(path) + + val df = spark.read.schema("col1 double").format(format).options(options).load(path) + + checkAnswer(df, doubleDF) + } + } + + test("read float and double together") { + withTempPath { dir => + val path = dir.getCanonicalPath + + val floatDir = s"$path${File.separator}part=float" + val doubleDir = s"$path${File.separator}part=double" + + floatDF.write.format(format).options(options).save(floatDir) + doubleDF.write.format(format).options(options).save(doubleDir) + + val df = spark.read + .schema(unionDF.schema) + .format(format) + .options(options) + .load(path) + .select("col1") + + checkAnswer(df, unionDF) + } + } +} + +/** + * Change a column type (Case 4). + * This suite assumes that a user gives a wider schema intentionally. + */ +trait ToDecimalTypeEvolutionTest extends SchemaEvolutionTest { + import testImplicits._ + + private lazy val values = 1 to 10 + private lazy val floatDF = values.map(_.toFloat).toDF("col1") + private lazy val doubleDF = values.map(_.toDouble).toDF("col1") + private lazy val decimalDF = values.map(BigDecimal(_)).toDF("col1") + private lazy val unionDF = floatDF.union(doubleDF).union(decimalDF) + + test("change column type from float to decimal") { + withTempPath { dir => + val path = dir.getCanonicalPath + + floatDF.write.format(format).options(options).save(path) + + val df = spark.read + .schema("col1 decimal(38,18)") + .format(format) + .options(options) + .load(path) + + checkAnswer(df, decimalDF) + } + } + + test("change column type from double to decimal") { + withTempPath { dir => + val path = dir.getCanonicalPath + + doubleDF.write.format(format).options(options).save(path) + + val df = spark.read + .schema("col1 decimal(38,18)") + .format(format) + .options(options) + .load(path) + + checkAnswer(df, decimalDF) + } + } + + test("read float, double, decimal together") { + withTempPath { dir => + val path = dir.getCanonicalPath + + val floatDir = s"$path${File.separator}part=float" + val doubleDir = s"$path${File.separator}part=double" + val decimalDir = s"$path${File.separator}part=decimal" + + floatDF.write.format(format).options(options).save(floatDir) + doubleDF.write.format(format).options(options).save(doubleDir) + decimalDF.write.format(format).options(options).save(decimalDir) + + val df = spark.read + .schema(unionDF.schema) + .format(format) + .options(options) + .load(path) + .select("col1") + + checkAnswer(df, unionDF) + } + } +} From 767d7ba9b5d5cfd659461ae8cf1e735aa0551f18 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 10 Jul 2018 14:04:49 -0700 Subject: [PATCH 2/3] Address comments --- docs/sql-programming-guide.md | 48 -------- ...utionSuite.scala => ReadSchemaSuite.scala} | 114 +++++++++--------- ...olutionTest.scala => ReadSchemaTest.scala} | 49 ++++---- 3 files changed, 82 insertions(+), 129 deletions(-) rename sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/{SchemaEvolutionSuite.scala => ReadSchemaSuite.scala} (59%) rename sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/{SchemaEvolutionTest.scala => ReadSchemaTest.scala} (90%) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index ff8540c616a4..ad23dae7c6b7 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -815,54 +815,6 @@ should start with, they can set `basePath` in the data source options. For examp when `path/to/table/gender=male` is the path of the data and users set `basePath` to `path/to/table/`, `gender` will be a partitioning column. -### Schema Evolution - -Users can control schema evolution in several ways. For example, new file can have additional -new column. All file-based data sources (`csv`, `json`, `orc`, and `parquet`) except `text` -data source supports this. Note that `text` data source always has a fixed single string column -schema. - -
- -
-val df1 = Seq("a", "b").toDF("col1") -val df2 = df1.withColumn("col2", lit("x")) - -df1.write.save("/tmp/evolved_data/part=1") -df2.write.save("/tmp/evolved_data/part=2") - -spark.read.schema("col1 string, col2 string").load("/tmp/evolved_data").show -+----+----+----+ -|col1|col2|part| -+----+----+----+ -| a| x| 2| -| b| x| 2| -| a|null| 1| -| b|null| 1| -+----+----+----+ -
- -
- -The following schema evolutions are supported in `csv`, `json`, `orc`, and `parquet` file-based -data sources. - - 1. Add a column - 2. Hide a column - 3. Change a column position - 4. Change a column type (`byte` -> `short` -> `int` -> `long`, `float` -> `double`) - -Note that, (4) means only safe evolution from small types to larger types without data loss. - - | File Format | Support | Note | - | ------------ | ------------ | ------------------------------------------------------ | - | CSV | 1, 2, 4 | | - | JSON | 1, 2, 3, 4 | | - | ORC | 1, 2, 3, 4 | Native vectorized ORC reader has the widest coverage. | - | PARQUET | 1, 2, 3 | | - - - ### Schema Merging Like ProtocolBuffer, Avro, and Thrift, Parquet also supports schema evolution. Users can start with diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaEvolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala similarity index 59% rename from sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaEvolutionSuite.scala rename to sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala index 6e94c8822c2b..f54a348bb3a7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaEvolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala @@ -20,72 +20,72 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql.internal.SQLConf /** - * Schema evolution suites have the following hierarchy and aims to guarantee users - * a backward-compatible schema evolution coverage on file-based data sources, and + * Read schema suites have the following hierarchy and aims to guarantee users + * a backward-compatible read-schema change coverage on file-based data sources, and * to prevent future regressions. * - * SchemaEvolutionSuite - * -> CSVSchemaEvolutionSuite - * -> HeaderCSVSchemaEvolutionSuite + * ReadSchemaSuite + * -> CSVReadSchemaEvolutionSuite + * -> HeaderCSVReadSchemaEvolutionSuite * - * -> JsonSchemaEvolutionSuite + * -> JsonReadSchemaEvolutionSuite * - * -> OrcSchemaEvolutionSuite - * -> VectorizedOrcSchemaEvolutionSuite + * -> OrcReadSchemaEvolutionSuite + * -> VectorizedOrcReadSchemaEvolutionSuite * - * -> ParquetSchemaEvolutionSuite - * -> VectorizedParquetSchemaEvolutionSuite - * -> MergedParquetSchemaEvolutionSuite + * -> ParquetReadSchemaEvolutionSuite + * -> VectorizedParquetReadSchemaEvolutionSuite + * -> MergedParquetReadSchemaEvolutionSuite */ /** * All file-based data sources supports column addition and removal at the end. */ -abstract class SchemaEvolutionSuite - extends AddColumnEvolutionTest - with HideColumnAtTheEndEvolutionTest { +abstract class ReadSchemaSuite + extends AddColumnTest + with HideColumnAtTheEndTest { var originalConf: Boolean = _ } -class CSVSchemaEvolutionSuite - extends SchemaEvolutionSuite - with IntegralTypeEvolutionTest - with ToDoubleTypeEvolutionTest - with ToDecimalTypeEvolutionTest - with ToStringTypeEvolutionTest { +class CSVReadSchemaSuite + extends ReadSchemaSuite + with IntegralTypeTest + with ToDoubleTypeTest + with ToDecimalTypeTest + with ToStringTypeTest { override val format: String = "csv" } -class HeaderCSVSchemaEvolutionSuite - extends SchemaEvolutionSuite - with IntegralTypeEvolutionTest - with ToDoubleTypeEvolutionTest - with ToDecimalTypeEvolutionTest - with ToStringTypeEvolutionTest { +class HeaderCSVReadSchemaSuite + extends ReadSchemaSuite + with IntegralTypeTest + with ToDoubleTypeTest + with ToDecimalTypeTest + with ToStringTypeTest { override val format: String = "csv" override val options = Map("header" -> "true") } -class JsonSchemaEvolutionSuite - extends SchemaEvolutionSuite - with HideColumnInTheMiddleEvolutionTest - with ChangePositionEvolutionTest - with IntegralTypeEvolutionTest - with ToDoubleTypeEvolutionTest - with ToDecimalTypeEvolutionTest - with ToStringTypeEvolutionTest { +class JsonReadSchemaSuite + extends ReadSchemaSuite + with HideColumnInTheMiddleTest + with ChangePositionTest + with IntegralTypeTest + with ToDoubleTypeTest + with ToDecimalTypeTest + with ToStringTypeTest { override val format: String = "json" } -class OrcSchemaEvolutionSuite - extends SchemaEvolutionSuite - with HideColumnInTheMiddleEvolutionTest - with ChangePositionEvolutionTest { +class OrcReadSchemaSuite + extends ReadSchemaSuite + with HideColumnInTheMiddleTest + with ChangePositionTest { override val format: String = "orc" @@ -101,13 +101,13 @@ class OrcSchemaEvolutionSuite } } -class VectorizedOrcSchemaEvolutionSuite - extends SchemaEvolutionSuite - with HideColumnInTheMiddleEvolutionTest - with ChangePositionEvolutionTest - with BooleanTypeEvolutionTest - with IntegralTypeEvolutionTest - with ToDoubleTypeEvolutionTest { +class VectorizedOrcReadSchemaSuite + extends ReadSchemaSuite + with HideColumnInTheMiddleTest + with ChangePositionTest + with BooleanTypeTest + with IntegralTypeTest + with ToDoubleTypeTest { override val format: String = "orc" @@ -123,10 +123,10 @@ class VectorizedOrcSchemaEvolutionSuite } } -class ParquetSchemaEvolutionSuite - extends SchemaEvolutionSuite - with HideColumnInTheMiddleEvolutionTest - with ChangePositionEvolutionTest { +class ParquetReadSchemaSuite + extends ReadSchemaSuite + with HideColumnInTheMiddleTest + with ChangePositionTest { override val format: String = "parquet" @@ -142,10 +142,10 @@ class ParquetSchemaEvolutionSuite } } -class VectorizedParquetSchemaEvolutionSuite - extends SchemaEvolutionSuite - with HideColumnInTheMiddleEvolutionTest - with ChangePositionEvolutionTest { +class VectorizedParquetReadSchemaSuite + extends ReadSchemaSuite + with HideColumnInTheMiddleTest + with ChangePositionTest { override val format: String = "parquet" @@ -161,10 +161,10 @@ class VectorizedParquetSchemaEvolutionSuite } } -class MergedParquetSchemaEvolutionSuite - extends SchemaEvolutionSuite - with HideColumnInTheMiddleEvolutionTest - with ChangePositionEvolutionTest { +class MergedParquetReadSchemaSuite + extends ReadSchemaSuite + with HideColumnInTheMiddleTest + with ChangePositionTest { override val format: String = "parquet" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaEvolutionTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaTest.scala similarity index 90% rename from sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaEvolutionTest.scala rename to sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaTest.scala index 9f7b95547e03..2a5457e00b4e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaEvolutionTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaTest.scala @@ -24,7 +24,8 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} /** - * Schema can evolve in several ways and the followings are supported in file-based data sources. + * The reader schema is said to be evolved (or projected) when it changed after the data is + * written by writers. The followings are supported in file-based data sources. * Note that partition columns are not maintained in files. Here, `column` means non-partition * column. * @@ -33,10 +34,10 @@ import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} * 3. Change a column position * 4. Change a column type (Upcast) * - * Here, we consider safe evolution without data loss. For example, data type evolution should be + * Here, we consider safe changes without data loss. For example, data type changes should be * from small types to larger types like `int`-to-`long`, not vice versa. * - * So far, file-based data sources have schema evolution coverages like the followings. + * So far, file-based data sources have the following coverages. * * | File Format | Coverage | Note | * | ------------ | ------------ | ------------------------------------------------------ | @@ -46,23 +47,23 @@ import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} * | ORC | 1, 2, 3, 4 | Native vectorized ORC reader has the widest coverage. | * | PARQUET | 1, 2, 3 | | * - * This aims to provide an explicit test coverage for schema evolution on file-based data sources. - * Since a file format has its own coverage of schema evolution, we need a test suite - * for each file-based data source with corresponding supported test case traits. + * This aims to provide an explicit test coverage for reader schema change on file-based data + * sources. Since a file format has its own coverage, we need a test suite for each file-based + * data source with corresponding supported test case traits. * * The following is a hierarchy of test traits. * - * SchemaEvolutionTest - * -> AddColumnEvolutionTest - * -> HideColumnEvolutionTest - * -> ChangePositionEvolutionTest - * -> BooleanTypeEvolutionTest - * -> IntegralTypeEvolutionTest - * -> ToDoubleTypeEvolutionTest - * -> ToDecimalTypeEvolutionTest + * ReadSchemaTest + * -> AddColumnTest + * -> HideColumnTest + * -> ChangePositionTest + * -> BooleanTypeTest + * -> IntegralTypeTest + * -> ToDoubleTypeTest + * -> ToDecimalTypeTest */ -trait SchemaEvolutionTest extends QueryTest with SQLTestUtils with SharedSQLContext { +trait ReadSchemaTest extends QueryTest with SQLTestUtils with SharedSQLContext { val format: String val options: Map[String, String] = Map.empty[String, String] } @@ -71,7 +72,7 @@ trait SchemaEvolutionTest extends QueryTest with SQLTestUtils with SharedSQLCont * Add column (Case 1). * This test suite assumes that the missing column should be `null`. */ -trait AddColumnEvolutionTest extends SchemaEvolutionTest { +trait AddColumnTest extends ReadSchemaTest { import testImplicits._ test("append column at the end") { @@ -110,7 +111,7 @@ trait AddColumnEvolutionTest extends SchemaEvolutionTest { /** * Hide column (Case 2-1). */ -trait HideColumnAtTheEndEvolutionTest extends SchemaEvolutionTest { +trait HideColumnAtTheEndTest extends ReadSchemaTest { import testImplicits._ test("hide column at the end") { @@ -156,7 +157,7 @@ trait HideColumnAtTheEndEvolutionTest extends SchemaEvolutionTest { /** * Hide column in the middle (Case 2-2). */ -trait HideColumnInTheMiddleEvolutionTest extends SchemaEvolutionTest { +trait HideColumnInTheMiddleTest extends ReadSchemaTest { import testImplicits._ test("hide column in the middle") { @@ -191,7 +192,7 @@ trait HideColumnInTheMiddleEvolutionTest extends SchemaEvolutionTest { * Change column positions (Case 3). * This suite assumes that all data set have the same number of columns. */ -trait ChangePositionEvolutionTest extends SchemaEvolutionTest { +trait ChangePositionTest extends ReadSchemaTest { import testImplicits._ test("change column position") { @@ -224,7 +225,7 @@ trait ChangePositionEvolutionTest extends SchemaEvolutionTest { * Change a column type (Case 4). * This suite assumes that a user gives a wider schema intentionally. */ -trait BooleanTypeEvolutionTest extends SchemaEvolutionTest { +trait BooleanTypeTest extends ReadSchemaTest { import testImplicits._ test("change column type from boolean to byte/short/int/long") { @@ -255,7 +256,7 @@ trait BooleanTypeEvolutionTest extends SchemaEvolutionTest { * Change a column type (Case 4). * This suite assumes that a user gives a wider schema intentionally. */ -trait ToStringTypeEvolutionTest extends SchemaEvolutionTest { +trait ToStringTypeTest extends ReadSchemaTest { import testImplicits._ test("read as string") { @@ -295,7 +296,7 @@ trait ToStringTypeEvolutionTest extends SchemaEvolutionTest { * Change a column type (Case 4). * This suite assumes that a user gives a wider schema intentionally. */ -trait IntegralTypeEvolutionTest extends SchemaEvolutionTest { +trait IntegralTypeTest extends ReadSchemaTest { import testImplicits._ @@ -380,7 +381,7 @@ trait IntegralTypeEvolutionTest extends SchemaEvolutionTest { * Change a column type (Case 4). * This suite assumes that a user gives a wider schema intentionally. */ -trait ToDoubleTypeEvolutionTest extends SchemaEvolutionTest { +trait ToDoubleTypeTest extends ReadSchemaTest { import testImplicits._ private lazy val values = 1 to 10 @@ -426,7 +427,7 @@ trait ToDoubleTypeEvolutionTest extends SchemaEvolutionTest { * Change a column type (Case 4). * This suite assumes that a user gives a wider schema intentionally. */ -trait ToDecimalTypeEvolutionTest extends SchemaEvolutionTest { +trait ToDecimalTypeTest extends ReadSchemaTest { import testImplicits._ private lazy val values = 1 to 10 From a7064ac0bc7b56ffffa3e322f31bda8a45bd9517 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 10 Jul 2018 14:21:26 -0700 Subject: [PATCH 3/3] fix comments --- .../execution/datasources/ReadSchemaSuite.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala index f54a348bb3a7..23c58e175fe5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala @@ -25,17 +25,17 @@ import org.apache.spark.sql.internal.SQLConf * to prevent future regressions. * * ReadSchemaSuite - * -> CSVReadSchemaEvolutionSuite - * -> HeaderCSVReadSchemaEvolutionSuite + * -> CSVReadSchemaSuite + * -> HeaderCSVReadSchemaSuite * - * -> JsonReadSchemaEvolutionSuite + * -> JsonReadSchemaSuite * - * -> OrcReadSchemaEvolutionSuite - * -> VectorizedOrcReadSchemaEvolutionSuite + * -> OrcReadSchemaSuite + * -> VectorizedOrcReadSchemaSuite * - * -> ParquetReadSchemaEvolutionSuite - * -> VectorizedParquetReadSchemaEvolutionSuite - * -> MergedParquetReadSchemaEvolutionSuite + * -> ParquetReadSchemaSuite + * -> VectorizedParquetReadSchemaSuite + * -> MergedParquetReadSchemaSuite */ /**