Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Benchmarks for dates
  • Loading branch information
MaxGekk committed Mar 28, 2020
commit f3c30b7de2abdadcb14bd9d878700f07a4e59566
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution.benchmark

import java.time.{LocalDateTime, ZoneOffset}
import java.time.{LocalDate, LocalDateTime, ZoneOffset}

import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.DataFrame
Expand Down Expand Up @@ -59,113 +59,88 @@ object DateTimeRebaseBenchmark extends SqlBasedBenchmark {

}

private def save(df: DataFrame, path: String, format: String = "parquet"): Unit = {
df.write.mode("overwrite").format(format).save(path)
private def genDateAfter1582(cardinality: Int): DataFrame = {
genTsAfter1582(cardinality).select($"ts".cast("date").as("date"))
}

private def load(path: String, format: String = "parquet"): Unit = {
spark.read.format(format).load(path).noop()
private def genDateBefore1582(cardinality: Int): DataFrame = {
genTsBefore1582(cardinality).select($"ts".cast("date").as("date"))
}

private def genDF(cardinality: Int, dateTime: String, after1582: Boolean): DataFrame = {
(dateTime, after1582) match {
case ("date", true) => genDateAfter1582(cardinality)
case ("date", false) => genDateBefore1582(cardinality)
case ("timestamp", true) => genTsAfter1582(cardinality)
case ("timestamp", false) => genTsBefore1582(cardinality)
case _ => throw new IllegalArgumentException(
s"cardinality = $cardinality dateTime = $dateTime after1582 = $after1582")
}
}

override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
withTempPath { path =>
runBenchmark("Parquet read/write") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you use more specific benchmark title because this is used in the generate files?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the name scoped by concrete benchmark?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will address the comments together with other comments because launching EC2 instance and re-running the benchmark twice for jdk 8 & 11 is time consuming process.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to make the title mention second rebase.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am going to replace it by "Rebasing dates/timestamps in Parquet datasource"

val rowsNum = 100000000
var numIters = 1
var benchmark = new Benchmark("Save timestamps to parquet", rowsNum, output = output)
benchmark.addCase("after 1582, noop", numIters) { _ =>
genTsAfter1582(rowsNum).noop()
}
val ts_after_1582_off = path.getAbsolutePath + "/ts_after_1582_off"
benchmark.addCase("after 1582, rebase off", numIters) { _ =>
withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> "false") {
save(genTsAfter1582(rowsNum), ts_after_1582_off)
}
}
val ts_after_1582_on = path.getAbsolutePath + "/ts_after_1582_on"
benchmark.addCase("after 1582, rebase on", numIters) { _ =>
withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> "true") {
save(genTsAfter1582(rowsNum), ts_after_1582_on)
}
}
benchmark.addCase("before 1582, noop", numIters) { _ =>
genTsBefore1582(rowsNum).noop()
}
val ts_before_1582_off = path.getAbsolutePath + "/ts_before_1582_off"
benchmark.addCase("before 1582, rebase off", numIters) { _ =>
withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> "false") {
save(genTsBefore1582(rowsNum), ts_before_1582_off)
}
}
val ts_before_1582_on = path.getAbsolutePath + "/ts_before_1582_on"
benchmark.addCase("before 1582, rebase on", numIters) { _ =>
withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> "true") {
save(genTsBefore1582(rowsNum), ts_before_1582_on)
}
}
benchmark.run()

numIters = 3
benchmark = new Benchmark("Load timestamps from parquet", rowsNum, output = output)
benchmark.addCase("after 1582, vec off, rebase off", numIters) { _ =>
withSQLConf(
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false",
SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> "false") {
load(ts_after_1582_off)
}
}
benchmark.addCase("after 1582, vec off, rebase on", numIters) { _ =>
withSQLConf(
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false",
SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> "true") {
load(ts_after_1582_on)
}
}
benchmark.addCase("after 1582, vec on, rebase off", numIters) { _ =>
withSQLConf(
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true",
SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> "false") {
load(ts_after_1582_off)
Seq("date", "timestamp").foreach { dateTime =>
val benchmark = new Benchmark(s"Save ${dateTime}s to parquet", rowsNum, output = output)
benchmark.addCase("after 1582, noop", 1) { _ =>
genDF(rowsNum, dateTime, after1582 = true).noop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you include the dataframe generation in the benchmark number? I think it should be excluded.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have already discussed this in PRs for another benchmarks. The overhead of preparing input dataframe is assumed to be subtracted from other numbers.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example:

after 1582, noop                                   9272           9272           0         10.8          92.7       1.0X
after 1582, rebase off                            21841          21841           0          4.6         218.4       0.4X

The noop benchmark shows non-avoidable overhead. If we subtract it, we get 21841 - 9272 = 12569. So, overhead of preparing input data is roughly 45%. I do believe this is important info, and we should keep in the benchmark results.

}
}
benchmark.addCase("after 1582, vec on, rebase on", numIters) { _ =>
withSQLConf(
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true",
SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> "true") {
load(ts_after_1582_on)
benchmark.addCase("before 1582, noop", 1) { _ =>
genDF(rowsNum, dateTime, after1582 = false).noop()
}
}

benchmark.addCase("before 1582, vec off, rebase off", numIters) { _ =>
withSQLConf(
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false",
SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> "false") {
load(ts_before_1582_off)
def save(after1582: Boolean, rebase: Boolean): Unit = {
val period = if (after1582) "after" else "before"
val rebaseFlag = if (rebase) "on" else "off"
val caseName = s"$period 1582, rebase $rebaseFlag"
benchmark.addCase(caseName, 1) { _ =>
withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> rebase.toString) {
val df = genDF(rowsNum, dateTime, after1582)
val pathToWrite = path.getAbsolutePath + s"/${dateTime}_${period}_1582_$rebaseFlag"
df.write
.mode("overwrite")
.format("parquet")
.save(pathToWrite)
}
}
}
}
benchmark.addCase("before 1582, vec off, rebase on", numIters) { _ =>
withSQLConf(
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false",
SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> "true") {
load(ts_before_1582_on)

Seq(true, false).foreach { after1582 =>
Seq(false, true).foreach { rebase =>
save(after1582, rebase)
}
}
}
benchmark.addCase("before 1582, vec on, rebase off", numIters) { _ =>
withSQLConf(
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true",
SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> "false") {
load(ts_before_1582_off)
benchmark.run()

val benchmark2 = new Benchmark(s"Load $dateTime from parquet", rowsNum, output = output)

def load(after1582: Boolean, vec: Boolean, rebase: Boolean): Unit = {
val period = if (after1582) "after" else "before"
val rebaseFlag = if (rebase) "on" else "off"
val vecFlag = if (vec) "on" else "off"
val caseName = s"$period 1582, vec $vecFlag, rebase $rebaseFlag"
benchmark2.addCase(caseName, 3) { _ =>
withSQLConf(
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vec.toString,
SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> rebase.toString) {
val pathToRead = path.getAbsolutePath + s"/${dateTime}_${period}_1582_$rebaseFlag"
spark.read.format("parquet").load(pathToRead).noop()
}
}
}
}
benchmark.addCase("before 1582, vec on, rebase on", numIters) { _ =>
withSQLConf(
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true",
SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> "true") {
load(ts_before_1582_on)

Seq(true, false).foreach { after1582 =>
Seq(false, true).foreach { vec =>
Seq(false, true).foreach { rebase =>
load(after1582, vec, rebase)
}
}
}
}

benchmark.run()
benchmark2.run()
}
}
}
}
Expand Down