Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
address comments
  • Loading branch information
cloud-fan committed Apr 21, 2020
commit 6905247094d22e939d9bd24177934573cc04be99
Original file line number Diff line number Diff line change
Expand Up @@ -1529,64 +1529,50 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
}

test("SPARK-31183: compatibility with Spark 2.4 in reading dates/timestamps") {
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "true") {
// test reading the existing 2.4 files and 3.0 newly written files together.
withTempPath { path =>
val path2_4 = getResourceAvroFilePath("before_1582_date_v2_4.avro")
val path3_0 = path.getCanonicalPath
val dateStr = "1001-01-01"
Seq(dateStr).toDF("str").select($"str".cast("date").as("date"))
.write.format("avro").save(path3_0)
checkAnswer(
spark.read.format("avro").load(path2_4, path3_0),
Seq(
Row(java.sql.Date.valueOf(dateStr)),
Row(java.sql.Date.valueOf(dateStr))))
}

withTempPath { path =>
val path2_4 = getResourceAvroFilePath("before_1582_ts_micros_v2_4.avro")
val path3_0 = path.getCanonicalPath
val avroSchema =
"""
|{
| "type" : "record",
| "name" : "test_schema",
| "fields" : [
| {"name": "ts", "type": {"type": "long", "logicalType": "timestamp-micros"}}
| ]
|}""".stripMargin
val tsStr = "1001-01-01 01:02:03.123456"
Seq(tsStr).toDF("str").select($"str".cast("timestamp").as("ts"))
.write.format("avro").option("avroSchema", avroSchema).save(path3_0)
checkAnswer(
spark.read.format("avro").load(path2_4, path3_0),
Seq(
Row(java.sql.Timestamp.valueOf(tsStr)),
Row(java.sql.Timestamp.valueOf(tsStr))))
// test reading the existing 2.4 files and new 3.0 files (with rebase on/off) together.
def checkReadMixedFiles(fileName: String, dt: String, dataStr: String): Unit = {
withTempPaths(2) { paths =>
paths.foreach(_.delete())
val path2_4 = getResourceAvroFilePath(fileName)
val path3_0 = paths(0).getCanonicalPath
val path3_0_rebase = paths(1).getCanonicalPath
if (dt == "date") {
val df = Seq(dataStr).toDF("str").select($"str".cast("date").as("date"))
df.write.format("avro").save(path3_0)
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE.key -> "true") {
df.write.format("avro").save(path3_0_rebase)
}
checkAnswer(
spark.read.format("avro").load(path2_4, path3_0, path3_0_rebase),
1.to(3).map(_ => Row(java.sql.Date.valueOf(dataStr))))
} else {
val df = Seq(dataStr).toDF("str").select($"str".cast("timestamp").as("ts"))
val avroSchema =
s"""
|{
| "type" : "record",
| "name" : "test_schema",
| "fields" : [
| {"name": "ts", "type": {"type": "long", "logicalType": "$dt"}}
| ]
|}""".stripMargin
df.write.format("avro").option("avroSchema", avroSchema).save(path3_0)
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE.key -> "true") {
df.write.format("avro").option("avroSchema", avroSchema).save(path3_0_rebase)
}
checkAnswer(
spark.read.format("avro").load(path2_4, path3_0, path3_0_rebase),
1.to(3).map(_ => Row(java.sql.Timestamp.valueOf(dataStr))))
}
}
}

withTempPath { path =>
val path2_4 = getResourceAvroFilePath("before_1582_ts_millis_v2_4.avro")
val path3_0 = path.getCanonicalPath
val avroSchema =
"""
|{
| "type" : "record",
| "name" : "test_schema",
| "fields" : [
| {"name": "ts", "type": {"type": "long", "logicalType": "timestamp-millis"}}
| ]
|}""".stripMargin
val tsStr = "1001-01-01 01:02:03.124"
Seq(tsStr).toDF("str").select($"str".cast("timestamp").as("ts"))
.write.format("avro").option("avroSchema", avroSchema).save(path3_0)
checkAnswer(
spark.read.format("avro").load(path2_4, path3_0),
Seq(
Row(java.sql.Timestamp.valueOf(tsStr)),
Row(java.sql.Timestamp.valueOf(tsStr))))
}
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "true") {
checkReadMixedFiles("before_1582_date_v2_4.avro", "date", "1001-01-01")
checkReadMixedFiles(
"before_1582_ts_micros_v2_4.avro", "timestamp-micros", "1001-01-01 01:02:03.123456")
checkReadMixedFiles(
"before_1582_ts_millis_v2_4.avro", "timestamp-millis", "1001-01-01 01:02:03.124")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.types.StructType
* @param parquetSchema Parquet schema of the records to be read
* @param catalystSchema Catalyst schema of the rows to be constructed
* @param schemaConverter A Parquet-Catalyst schema converter that helps initializing row converters
* @param convertTz the optional time zone to convert to for int96 data
* @param convertTz the optional time zone to convert to int96 data
* @param rebaseDateTime true if need to rebase date/timestamp from Julian to Proleptic Gregorian
* calendar
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ private[parquet] class ParquetPrimitiveConverter(val updater: ParentContainerUpd
* @param parquetType Parquet schema of Parquet records
* @param catalystType Spark SQL schema that corresponds to the Parquet record type. User-defined
* types should have been expanded.
* @param convertTz the optional time zone to convert to for int96 data
* @param convertTz the optional time zone to convert to int96 data
* @param rebaseDateTime true if need to rebase date/timestamp from Julian to Proleptic Gregorian
* calendar
* @param updater An updater which propagates converted field values to the parent container
Expand Down
2 changes: 1 addition & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,5 @@ package object sql {
* Parquet/Avro file metadata key to indicate that the file was written with legacy datetime
* values.
*/
private[sql] val SPARK_LEGACY_DATETIME = "org.apache.spark.legacyDatetime"
private[sql] val SPARK_LEGACY_DATETIME = "org.apache.spark.legacyDateTime"
}
Original file line number Diff line number Diff line change
Expand Up @@ -883,55 +883,49 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}

test("SPARK-31159: compatibility with Spark 2.4 in reading dates/timestamps") {
Seq(false, true).foreach { vectorized =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> "true") {
// test reading the existing 2.4 files and 3.0 newly written files together.
withTempPath { path =>
val path2_4 = getResourceParquetFilePath(
"test-data/before_1582_date_v2_4.snappy.parquet")
val path3_0 = path.getCanonicalPath
val dateStr = "1001-01-01"
Seq(dateStr).toDF("str").select($"str".cast("date").as("date"))
.write.parquet(path3_0)
checkAnswer(
spark.read.format("parquet").load(path2_4, path3_0),
Seq(
Row(java.sql.Date.valueOf(dateStr)),
Row(java.sql.Date.valueOf(dateStr))))
// test reading the existing 2.4 files and new 3.0 files (with rebase on/off) together.
def checkReadMixedFiles(fileName: String, dt: String, dataStr: String): Unit = {
withTempPaths(2) { paths =>
paths.foreach(_.delete())
val path2_4 = getResourceParquetFilePath("test-data/" + fileName)
val path3_0 = paths(0).getCanonicalPath
val path3_0_rebase = paths(1).getCanonicalPath
if (dt == "date") {
val df = Seq(dataStr).toDF("str").select($"str".cast("date").as("date"))
df.write.parquet(path3_0)
withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> "true") {
df.write.parquet(path3_0_rebase)
}

withTempPath { path =>
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "TIMESTAMP_MICROS") {
val path2_4 = getResourceParquetFilePath(
"test-data/before_1582_timestamp_micros_v2_4.snappy.parquet")
val path3_0 = path.getCanonicalPath
val tsStr = "1001-01-01 01:02:03.123456"
Seq(tsStr).toDF("str").select($"str".cast("timestamp").as("ts"))
.write.parquet(path3_0)
checkAnswer(
spark.read.format("parquet").load(path2_4, path3_0),
Seq(
Row(java.sql.Timestamp.valueOf(tsStr)),
Row(java.sql.Timestamp.valueOf(tsStr))))
checkAnswer(
spark.read.format("parquet").load(path2_4, path3_0, path3_0_rebase),
1.to(3).map(_ => Row(java.sql.Date.valueOf(dataStr))))
} else {
val df = Seq(dataStr).toDF("str").select($"str".cast("timestamp").as("ts"))
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> dt) {
df.write.parquet(path3_0)
withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> "true") {
df.write.parquet(path3_0_rebase)
}
}
checkAnswer(
spark.read.format("parquet").load(path2_4, path3_0, path3_0_rebase),
1.to(3).map(_ => Row(java.sql.Timestamp.valueOf(dataStr))))
}
}
}

withTempPath { path =>
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "TIMESTAMP_MILLIS") {
val path2_4 = getResourceParquetFilePath(
"test-data/before_1582_timestamp_millis_v2_4.snappy.parquet")
val path3_0 = path.getCanonicalPath
val tsStr = "1001-01-01 01:02:03.123"
Seq(tsStr).toDF("str").select($"str".cast("timestamp").as("ts"))
.write.parquet(path3_0)
checkAnswer(
spark.read.format("parquet").load(path2_4, path3_0),
Seq(
Row(java.sql.Timestamp.valueOf(tsStr)),
Row(java.sql.Timestamp.valueOf(tsStr))))
}
}
Seq(false, true).foreach { vectorized =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> "true") {
checkReadMixedFiles("before_1582_date_v2_4.snappy.parquet", "date", "1001-01-01")
checkReadMixedFiles(
"before_1582_timestamp_micros_v2_4.snappy.parquet",
"TIMESTAMP_MICROS",
"1001-01-01 01:02:03.123456")
checkReadMixedFiles(
"before_1582_timestamp_millis_v2_4.snappy.parquet",
"TIMESTAMP_MILLIS",
"1001-01-01 01:02:03.123")
}

// INT96 is a legacy timestamp format and we always rebase the seconds for it.
Expand Down