Skip to content

Commit 8674054

Browse files
committed
[SPARK-16632][SQL] Use Spark requested schema to guide vectorized Parquet reader initialization
## What changes were proposed in this pull request? In `SpecificParquetRecordReaderBase`, which is used by the vectorized Parquet reader, we convert the Parquet requested schema into a Spark schema to guide column reader initialization. However, the Parquet requested schema is tailored from the schema of the physical file being scanned, and may have inaccurate type information due to bugs of other systems (e.g. HIVE-14294). On the other hand, we already set the real Spark requested schema into Hadoop configuration in [`ParquetFileFormat`][1]. This PR simply reads out this schema to replace the converted one. ## How was this patch tested? New test case added in `ParquetQuerySuite`. [1]: https://github.com/apache/spark/blob/v2.0.0-rc5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L292-L294 Author: Cheng Lian <lian@databricks.com> Closes apache#14278 from liancheng/spark-16632-simpler-fix.
1 parent 864b764 commit 8674054

File tree

2 files changed

+28
-1
lines changed

2 files changed

+28
-1
lines changed

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.apache.parquet.schema.MessageType;
6161
import org.apache.parquet.schema.Types;
6262
import org.apache.spark.sql.types.StructType;
63+
import org.apache.spark.sql.types.StructType$;
6364

6465
/**
6566
* Base class for custom RecordReaders for Parquet that directly materialize to `T`.
@@ -136,7 +137,9 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
136137
ReadSupport.ReadContext readContext = readSupport.init(new InitContext(
137138
taskAttemptContext.getConfiguration(), toSetMultiMap(fileMetadata), fileSchema));
138139
this.requestedSchema = readContext.getRequestedSchema();
139-
this.sparkSchema = new ParquetSchemaConverter(configuration).convert(requestedSchema);
140+
String sparkRequestedSchemaString =
141+
configuration.get(ParquetReadSupport$.MODULE$.SPARK_ROW_REQUESTED_SCHEMA());
142+
this.sparkSchema = StructType$.MODULE$.fromString(sparkRequestedSchemaString);
140143
this.reader = new ParquetFileReader(configuration, file, blocks, requestedSchema.getColumns());
141144
for (BlockMetaData block : blocks) {
142145
this.totalRowCount += block.getRowCount();

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -680,6 +680,30 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
680680
)
681681
}
682682
}
683+
684+
test("SPARK-16632: read Parquet int32 as ByteType and ShortType") {
685+
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
686+
withTempPath { dir =>
687+
val path = dir.getCanonicalPath
688+
689+
// When being written to Parquet, `TINYINT` and `SMALLINT` should be converted into
690+
// `int32 (INT_8)` and `int32 (INT_16)` respectively. However, Hive doesn't add the `INT_8`
691+
// and `INT_16` annotation properly (HIVE-14294). Thus, when reading files written by Hive
692+
// using Spark with the vectorized Parquet reader enabled, we may hit error due to type
693+
// mismatch.
694+
//
695+
// Here we are simulating Hive's behavior by writing a single `INT` field and then read it
696+
// back as `TINYINT` and `SMALLINT` in Spark to verify this issue.
697+
Seq(1).toDF("f").write.parquet(path)
698+
699+
val withByteField = new StructType().add("f", ByteType)
700+
checkAnswer(spark.read.schema(withByteField).parquet(path), Row(1: Byte))
701+
702+
val withShortField = new StructType().add("f", ShortType)
703+
checkAnswer(spark.read.schema(withShortField).parquet(path), Row(1: Short))
704+
}
705+
}
706+
}
683707
}
684708

685709
object TestingUDT {

0 commit comments

Comments
 (0)