Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ import org.apache.spark.unsafe.types.UTF8String
/**
* A deserializer to deserialize data in avro format to data in catalyst format.
*/
class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
private lazy val decimalConversions = new DecimalConversion()
class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType, rebaseDateTime: Boolean) {

def this(rootAvroType: Schema, rootCatalystType: DataType) {
this(rootAvroType, rootCatalystType,
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ))
}

// Enable rebasing date/timestamp from Julian to Proleptic Gregorian calendar
private val rebaseDateTime =
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ)
private lazy val decimalConversions = new DecimalConversion()

private val converter: Any => Any = rootCatalystType match {
// A shortcut for empty schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile}
import org.apache.spark.sql.execution.datasources.{DataSourceUtils, FileFormat, OutputWriterFactory, PartitionedFile}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration
Expand Down Expand Up @@ -123,8 +124,12 @@ private[sql] class AvroFileFormat extends FileFormat
reader.sync(file.start)
val stop = file.start + file.length

val deserializer =
new AvroDeserializer(userProvidedSchema.getOrElse(reader.getSchema), requiredSchema)
val rebaseDateTime = DataSourceUtils.needRebaseDateTime(
reader.asInstanceOf[DataFileReader[_]].getMetaString).getOrElse {
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ)
}
val deserializer = new AvroDeserializer(
userProvidedSchema.getOrElse(reader.getSchema), requiredSchema, rebaseDateTime)

new Iterator[InternalRow] {
private[this] var completed = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext}

import org.apache.spark.SPARK_VERSION_SHORT
import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY
import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.OutputWriter
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
Expand All @@ -41,16 +42,24 @@ private[avro] class AvroOutputWriter(
schema: StructType,
avroSchema: Schema) extends OutputWriter {

// Whether to rebase datetimes from Gregorian to Julian calendar in write
private val rebaseDateTime: Boolean =
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE)

// The input rows will never be null.
private lazy val serializer = new AvroSerializer(schema, avroSchema, nullable = false)
private lazy val serializer =
new AvroSerializer(schema, avroSchema, nullable = false, rebaseDateTime)

/**
* Overrides the couple of methods responsible for generating the output streams / files so
* that the data can be correctly partitioned
*/
private val recordWriter: RecordWriter[AvroKey[GenericRecord], NullWritable] = {
val sparkVersion = Map(SPARK_VERSION_METADATA_KEY -> SPARK_VERSION_SHORT).asJava
new SparkAvroKeyOutputFormat(sparkVersion) {
val fileMeta = Map(SPARK_VERSION_METADATA_KEY -> SPARK_VERSION_SHORT) ++ {
if (rebaseDateTime) Some(SPARK_LEGACY_DATETIME -> "") else None
}

new SparkAvroKeyOutputFormat(fileMeta.asJava) {

override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
new Path(path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,16 @@ import org.apache.spark.sql.types._
/**
* A serializer to serialize data in catalyst format to data in avro format.
*/
class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean)
extends Logging {
class AvroSerializer(
rootCatalystType: DataType,
rootAvroType: Schema,
nullable: Boolean,
rebaseDateTime: Boolean) extends Logging {

// Whether to rebase datetimes from Gregorian to Julian calendar in write
private val rebaseDateTime: Boolean =
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE)
def this(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean) {
this(rootCatalystType, rootAvroType, nullable,
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE))
}

def serialize(catalystData: Any): Any = {
converter.apply(catalystData)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.avro.{AvroDeserializer, AvroOptions}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.read.PartitionReader
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile}
import org.apache.spark.sql.execution.datasources.v2.{EmptyPartitionReader, FilePartitionReaderFactory, PartitionReaderWithPartitionValues}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -88,8 +88,12 @@ case class AvroPartitionReaderFactory(
reader.sync(partitionedFile.start)
val stop = partitionedFile.start + partitionedFile.length

val deserializer =
new AvroDeserializer(userProvidedSchema.getOrElse(reader.getSchema), readDataSchema)
val rebaseDateTime = DataSourceUtils.needRebaseDateTime(
reader.asInstanceOf[DataFileReader[_]].getMetaString).getOrElse {
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ)
}
val deserializer = new AvroDeserializer(
userProvidedSchema.getOrElse(reader.getSchema), readDataSchema, rebaseDateTime)

val fileReader = new PartitionReader[InternalRow] {
private[this] var completed = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite
""".stripMargin
val avroSchema = new Schema.Parser().parse(jsonFormatSchema)
val dataType = SchemaConverters.toSqlType(avroSchema).dataType
val deserializer = new AvroDeserializer(avroSchema, dataType)
val deserializer = new AvroDeserializer(avroSchema, dataType, rebaseDateTime = false)

def checkDeserialization(data: GenericData.Record, expected: Any): Unit = {
assert(checkResult(
Expand Down
103 changes: 80 additions & 23 deletions external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,8 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
}, new GenericDatumReader[Any]()).getSchema.toString(false)
}

private def readResourceAvroFile(name: String): DataFrame = {
val url = Thread.currentThread().getContextClassLoader.getResource(name)
spark.read.format("avro").load(url.toString)
private def getResourceAvroFilePath(name: String): String = {
Thread.currentThread().getContextClassLoader.getResource(name).toString
}

test("resolve avro data source") {
Expand Down Expand Up @@ -1530,16 +1529,50 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
}

test("SPARK-31183: compatibility with Spark 2.4 in reading dates/timestamps") {
// test reading the existing 2.4 files and new 3.0 files (with rebase on/off) together.
def checkReadMixedFiles(fileName: String, dt: String, dataStr: String): Unit = {
withTempPaths(2) { paths =>
paths.foreach(_.delete())
val path2_4 = getResourceAvroFilePath(fileName)
val path3_0 = paths(0).getCanonicalPath
val path3_0_rebase = paths(1).getCanonicalPath
if (dt == "date") {
val df = Seq(dataStr).toDF("str").select($"str".cast("date").as("date"))
df.write.format("avro").save(path3_0)
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE.key -> "true") {
df.write.format("avro").save(path3_0_rebase)
}
checkAnswer(
spark.read.format("avro").load(path2_4, path3_0, path3_0_rebase),
1.to(3).map(_ => Row(java.sql.Date.valueOf(dataStr))))
} else {
val df = Seq(dataStr).toDF("str").select($"str".cast("timestamp").as("ts"))
val avroSchema =
s"""
|{
| "type" : "record",
| "name" : "test_schema",
| "fields" : [
| {"name": "ts", "type": {"type": "long", "logicalType": "$dt"}}
| ]
|}""".stripMargin
df.write.format("avro").option("avroSchema", avroSchema).save(path3_0)
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE.key -> "true") {
df.write.format("avro").option("avroSchema", avroSchema).save(path3_0_rebase)
}
checkAnswer(
spark.read.format("avro").load(path2_4, path3_0, path3_0_rebase),
1.to(3).map(_ => Row(java.sql.Timestamp.valueOf(dataStr))))
}
}
}

withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "true") {
checkAnswer(
readResourceAvroFile("before_1582_date_v2_4.avro"),
Row(java.sql.Date.valueOf("1001-01-01")))
checkAnswer(
readResourceAvroFile("before_1582_ts_micros_v2_4.avro"),
Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
checkAnswer(
readResourceAvroFile("before_1582_ts_millis_v2_4.avro"),
Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.124")))
checkReadMixedFiles("before_1582_date_v2_4.avro", "date", "1001-01-01")
checkReadMixedFiles(
"before_1582_ts_micros_v2_4.avro", "timestamp-micros", "1001-01-01 01:02:03.123456")
checkReadMixedFiles(
"before_1582_ts_millis_v2_4.avro", "timestamp-millis", "1001-01-01 01:02:03.124")
}
}

Expand All @@ -1554,10 +1587,18 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
.write.format("avro")
.save(path)
}
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "true") {
checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(tsStr)))

// The file metadata indicates if it needs rebase or not, so we can always get the correct
// result regardless of the "rebaseInRead" config.
Seq(true, false).foreach { rebase =>
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> rebase.toString) {
checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(tsStr)))
}
}
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "false") {

// Force to not rebase to prove the written datetime values are rebased and we will get
// wrong result if we don't rebase while reading.
withSQLConf("spark.test.forceNoRebase" -> "true") {
checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(nonRebased)))
}
}
Expand Down Expand Up @@ -1589,12 +1630,20 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
.format("avro")
.save(path)
}
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "true") {
checkAnswer(
spark.read.schema("ts timestamp").format("avro").load(path),
Row(Timestamp.valueOf(rebased)))

// The file metadata indicates if it needs rebase or not, so we can always get the correct
// result regardless of the "rebaseInRead" config.
Seq(true, false).foreach { rebase =>
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> rebase.toString) {
checkAnswer(
spark.read.schema("ts timestamp").format("avro").load(path),
Row(Timestamp.valueOf(rebased)))
}
}
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "false") {

// Force to not rebase to prove the written datetime values are rebased and we will get
// wrong result if we don't rebase while reading.
withSQLConf("spark.test.forceNoRebase" -> "true") {
checkAnswer(
spark.read.schema("ts timestamp").format("avro").load(path),
Row(Timestamp.valueOf(nonRebased)))
Expand All @@ -1612,10 +1661,18 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
.write.format("avro")
.save(path)
}
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "true") {
checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-01")))

// The file metadata indicates if it needs rebase or not, so we can always get the correct
// result regardless of the "rebaseInRead" config.
Seq(true, false).foreach { rebase =>
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> rebase.toString) {
checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-01")))
}
}
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "false") {

// Force to not rebase to prove the written datetime values are rebased and we will get
// wrong result if we don't rebase while reading.
withSQLConf("spark.test.forceNoRebase" -> "true") {
checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-07")))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.spark.sql.catalyst.util.RebaseDateTime;
import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.DecimalType;

Expand Down Expand Up @@ -109,7 +108,8 @@ public VectorizedColumnReader(
ColumnDescriptor descriptor,
OriginalType originalType,
PageReader pageReader,
ZoneId convertTz) throws IOException {
ZoneId convertTz,
boolean rebaseDateTime) throws IOException {
this.descriptor = descriptor;
this.pageReader = pageReader;
this.convertTz = convertTz;
Expand All @@ -132,7 +132,7 @@ public VectorizedColumnReader(
if (totalValueCount == 0) {
throw new IOException("totalValueCount == 0");
}
this.rebaseDateTime = SQLConf.get().parquetRebaseDateTimeInReadEnabled();
this.rebaseDateTime = rebaseDateTime;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,12 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
* The timezone that timestamp INT96 values should be converted to. Null if no conversion. Here to
* workaround incompatibilities between different engines when writing timestamp values.
*/
private ZoneId convertTz = null;
private final ZoneId convertTz;

/**
* true if need to rebase date/timestamp from Julian to Proleptic Gregorian calendar.
*/
private final boolean rebaseDateTime;

/**
* columnBatch object that is used for batch decoding. This is created on first use and triggers
Expand Down Expand Up @@ -116,12 +121,19 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
*/
private final MemoryMode MEMORY_MODE;

public VectorizedParquetRecordReader(ZoneId convertTz, boolean useOffHeap, int capacity) {
public VectorizedParquetRecordReader(
ZoneId convertTz, boolean rebaseDateTime, boolean useOffHeap, int capacity) {
this.convertTz = convertTz;
this.rebaseDateTime = rebaseDateTime;
MEMORY_MODE = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP;
this.capacity = capacity;
}

// For test only.
public VectorizedParquetRecordReader(boolean useOffHeap, int capacity) {
this(null, false, useOffHeap, capacity);
}

/**
* Implementation of RecordReader API.
*/
Expand Down Expand Up @@ -309,7 +321,7 @@ private void checkEndOfRowGroup() throws IOException {
for (int i = 0; i < columns.size(); ++i) {
if (missingColumns[i]) continue;
columnReaders[i] = new VectorizedColumnReader(columns.get(i), types.get(i).getOriginalType(),
pages.getPageReader(columns.get(i)), convertTz);
pages.getPageReader(columns.get(i)), convertTz, rebaseDateTime);
}
totalCountLoadedSoFar += pages.getRowCount();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ import org.apache.hadoop.fs.Path
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization

import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils


object DataSourceUtils {
Expand Down Expand Up @@ -64,4 +67,18 @@ object DataSourceUtils {

private[sql] def isDataFile(fileName: String) =
!(fileName.startsWith("_") || fileName.startsWith("."))

def needRebaseDateTime(lookupFileMeta: String => String): Option[Boolean] = {
if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") {
return Some(false)
}
// If there is no version, we return None and let the caller side to decide.
Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version =>
// Files written by Spark 2.4 and earlier follow the legacy hybrid calendar and we need to
// rebase the datetime values.
// Files written by Spark 3.0 and latter may also need the rebase if they were written with
// the "rebaseInWrite" config enabled.
version < "3.0.0" || lookupFileMeta(SPARK_LEGACY_DATETIME) != null
}
}
}
Loading