-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-25484][SQL][TEST] Refactor ExternalAppendOnlyUnsafeRowArrayBenchmark #22617
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
5bc905a
93892de
a26dbc7
0f676bc
0b04fa3
702e61c
87c4180
411174a
b0d829e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,50 @@ | ||
| ================================================================================================ | ||
| WITHOUT SPILL | ||
| ================================================================================================ | ||
|
|
||
| Java HotSpot(TM) 64-Bit Server VM 1.8.0_162-b12 on Mac OS X 10.13.6 | ||
| Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz | ||
|
|
||
| Array with 100000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
| ------------------------------------------------------------------------------------------------ | ||
| ArrayBuffer 4810 / 5120 21.3 47.0 1.0X | ||
| ExternalAppendOnlyUnsafeRowArray 4996 / 5043 20.5 48.8 1.0X | ||
|
|
||
| Java HotSpot(TM) 64-Bit Server VM 1.8.0_162-b12 on Mac OS X 10.13.6 | ||
| Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz | ||
|
|
||
| Array with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
| ------------------------------------------------------------------------------------------------ | ||
| ArrayBuffer 8839 / 8951 29.7 33.7 1.0X | ||
| ExternalAppendOnlyUnsafeRowArray 9884 / 9888 26.5 37.7 0.9X | ||
|
|
||
| Java HotSpot(TM) 64-Bit Server VM 1.8.0_162-b12 on Mac OS X 10.13.6 | ||
| Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz | ||
|
|
||
| Array with 30000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
| ------------------------------------------------------------------------------------------------ | ||
| ArrayBuffer 19980 / 20061 24.6 40.6 1.0X | ||
| ExternalAppendOnlyUnsafeRowArray 21630 / 21645 22.7 44.0 0.9X | ||
|
|
||
|
|
||
| ================================================================================================ | ||
| WITH SPILL | ||
| ================================================================================================ | ||
|
|
||
| Java HotSpot(TM) 64-Bit Server VM 1.8.0_162-b12 on Mac OS X 10.13.6 | ||
| Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz | ||
|
|
||
| Spilling with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
| ------------------------------------------------------------------------------------------------ | ||
| UnsafeExternalSorter 15829 / 15845 16.6 60.4 1.0X | ||
| ExternalAppendOnlyUnsafeRowArray 10158 / 10174 25.8 38.7 1.6X | ||
|
||
|
|
||
| Java HotSpot(TM) 64-Bit Server VM 1.8.0_162-b12 on Mac OS X 10.13.6 | ||
| Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz | ||
|
|
||
| Spilling with 10000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
| ------------------------------------------------------------------------------------------------ | ||
| UnsafeExternalSorter 6 / 6 28.3 35.3 1.0X | ||
| ExternalAppendOnlyUnsafeRowArray 6 / 7 27.7 36.1 1.0X | ||
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I ran the original master branch and get the following. Since the trend is the same, this refactoring PR looks safe. |
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,24 +20,56 @@ package org.apache.spark.sql.execution | |
| import scala.collection.mutable.ArrayBuffer | ||
|
|
||
| import org.apache.spark.{SparkConf, SparkContext, SparkEnv, TaskContext} | ||
| import org.apache.spark.benchmark.Benchmark | ||
| import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} | ||
| import org.apache.spark.internal.config | ||
| import org.apache.spark.memory.MemoryTestingUtils | ||
| import org.apache.spark.sql.catalyst.expressions.UnsafeRow | ||
| import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter | ||
|
|
||
| object ExternalAppendOnlyUnsafeRowArrayBenchmark { | ||
| /** | ||
| * Benchmark ExternalAppendOnlyUnsafeRowArray. | ||
| * To run this benchmark: | ||
| * {{{ | ||
| * 1. without sbt: | ||
| * bin/spark-submit --class <this class> --jars <spark core test jar> <spark sql test jar> | ||
| * 2. build/sbt "sql/test:runMain <this class>" | ||
|
||
| * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt ";project sql;set javaOptions | ||
| * in Test -= \"-Dspark.memory.debugFill=true\";test:runMain <this class>" | ||
| * Results will be written to | ||
| * "benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-results.txt". | ||
| * }}} | ||
| */ | ||
| object ExternalAppendOnlyUnsafeRowArrayBenchmark extends BenchmarkBase { | ||
|
|
||
| def testAgainstRawArrayBuffer(numSpillThreshold: Int, numRows: Int, iterations: Int): Unit = { | ||
| private val conf = new SparkConf(false) | ||
| // Make the Java serializer write a reset instruction (TC_RESET) after each object to test | ||
| // for a bug we had with bytes written past the last object in a batch (SPARK-2792) | ||
| .set("spark.serializer.objectStreamReset", "1") | ||
| .set("spark.serializer", "org.apache.spark.serializer.JavaSerializer") | ||
|
|
||
| private def withFakeTaskContext(f: => Unit): Unit = { | ||
| val sc = new SparkContext("local", "test", conf) | ||
| val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get) | ||
| TaskContext.setTaskContext(taskContext) | ||
| f | ||
| sc.stop() | ||
| } | ||
|
|
||
| private def testRows(numRows: Int): Seq[UnsafeRow] = { | ||
| val random = new java.util.Random() | ||
| val rows = (1 to numRows).map(_ => { | ||
| (1 to numRows).map(_ => { | ||
| val row = new UnsafeRow(1) | ||
| row.pointTo(new Array[Byte](64), 16) | ||
| row.setLong(0, random.nextLong()) | ||
| row | ||
| }) | ||
| } | ||
|
|
||
| val benchmark = new Benchmark(s"Array with $numRows rows", iterations * numRows) | ||
| def testAgainstRawArrayBuffer(numSpillThreshold: Int, numRows: Int, iterations: Int): Unit = { | ||
| val rows = testRows(numRows) | ||
|
|
||
| val benchmark = new Benchmark(s"Array with $numRows rows", iterations * numRows, | ||
| output = output) | ||
|
|
||
| // Internally, `ExternalAppendOnlyUnsafeRowArray` will create an | ||
| // in-memory buffer of size `numSpillThreshold`. This will mimic that | ||
|
|
@@ -82,33 +114,19 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark { | |
| } | ||
| } | ||
|
|
||
| val conf = new SparkConf(false) | ||
| // Make the Java serializer write a reset instruction (TC_RESET) after each object to test | ||
| // for a bug we had with bytes written past the last object in a batch (SPARK-2792) | ||
| conf.set("spark.serializer.objectStreamReset", "1") | ||
| conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer") | ||
|
|
||
| val sc = new SparkContext("local", "test", conf) | ||
| val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get) | ||
| TaskContext.setTaskContext(taskContext) | ||
| benchmark.run() | ||
| sc.stop() | ||
| withFakeTaskContext { | ||
| benchmark.run() | ||
| } | ||
| } | ||
|
|
||
| def testAgainstRawUnsafeExternalSorter( | ||
| numSpillThreshold: Int, | ||
| numRows: Int, | ||
| iterations: Int): Unit = { | ||
| val rows = testRows(numRows) | ||
|
|
||
| val random = new java.util.Random() | ||
| val rows = (1 to numRows).map(_ => { | ||
| val row = new UnsafeRow(1) | ||
| row.pointTo(new Array[Byte](64), 16) | ||
| row.setLong(0, random.nextLong()) | ||
| row | ||
| }) | ||
|
|
||
| val benchmark = new Benchmark(s"Spilling with $numRows rows", iterations * numRows) | ||
| val benchmark = new Benchmark(s"Spilling with $numRows rows", iterations * numRows, | ||
| output = output) | ||
|
|
||
| benchmark.addCase("UnsafeExternalSorter") { _: Int => | ||
| var sum = 0L | ||
|
|
@@ -158,80 +176,23 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark { | |
| } | ||
| } | ||
|
|
||
| val conf = new SparkConf(false) | ||
| // Make the Java serializer write a reset instruction (TC_RESET) after each object to test | ||
| // for a bug we had with bytes written past the last object in a batch (SPARK-2792) | ||
| conf.set("spark.serializer.objectStreamReset", "1") | ||
| conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer") | ||
|
|
||
| val sc = new SparkContext("local", "test", conf) | ||
| val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get) | ||
| TaskContext.setTaskContext(taskContext) | ||
| benchmark.run() | ||
| sc.stop() | ||
| withFakeTaskContext { | ||
| benchmark.run() | ||
| } | ||
| } | ||
|
|
||
| def main(args: Array[String]): Unit = { | ||
|
|
||
| // ========================================================================================= // | ||
| // WITHOUT SPILL | ||
| // ========================================================================================= // | ||
|
|
||
| val spillThreshold = 100 * 1000 | ||
|
|
||
| /* | ||
| Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz | ||
|
|
||
| Array with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
| ------------------------------------------------------------------------------------------------ | ||
| ArrayBuffer 7821 / 7941 33.5 29.8 1.0X | ||
| ExternalAppendOnlyUnsafeRowArray 8798 / 8819 29.8 33.6 0.9X | ||
| */ | ||
| testAgainstRawArrayBuffer(spillThreshold, 1000, 1 << 18) | ||
|
|
||
| /* | ||
| Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz | ||
|
|
||
| Array with 30000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
| ------------------------------------------------------------------------------------------------ | ||
| ArrayBuffer 19200 / 19206 25.6 39.1 1.0X | ||
| ExternalAppendOnlyUnsafeRowArray 19558 / 19562 25.1 39.8 1.0X | ||
| */ | ||
| testAgainstRawArrayBuffer(spillThreshold, 30 * 1000, 1 << 14) | ||
|
|
||
| /* | ||
| Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz | ||
|
|
||
| Array with 100000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
| ------------------------------------------------------------------------------------------------ | ||
| ArrayBuffer 5949 / 6028 17.2 58.1 1.0X | ||
| ExternalAppendOnlyUnsafeRowArray 6078 / 6138 16.8 59.4 1.0X | ||
| */ | ||
| testAgainstRawArrayBuffer(spillThreshold, 100 * 1000, 1 << 10) | ||
|
|
||
| // ========================================================================================= // | ||
| // WITH SPILL | ||
| // ========================================================================================= // | ||
|
|
||
| /* | ||
| Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz | ||
|
|
||
| Spilling with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
| ------------------------------------------------------------------------------------------------ | ||
| UnsafeExternalSorter 9239 / 9470 28.4 35.2 1.0X | ||
| ExternalAppendOnlyUnsafeRowArray 8857 / 8909 29.6 33.8 1.0X | ||
| */ | ||
| testAgainstRawUnsafeExternalSorter(100 * 1000, 1000, 1 << 18) | ||
|
|
||
| /* | ||
| Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz | ||
|
|
||
| Spilling with 10000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
| ------------------------------------------------------------------------------------------------ | ||
| UnsafeExternalSorter 4 / 5 39.3 25.5 1.0X | ||
| ExternalAppendOnlyUnsafeRowArray 5 / 6 29.8 33.5 0.8X | ||
| */ | ||
| testAgainstRawUnsafeExternalSorter( | ||
| config.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get, 10 * 1000, 1 << 4) | ||
| override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { | ||
| runBenchmark("WITHOUT SPILL") { | ||
| val spillThreshold = 100 * 1000 | ||
| testAgainstRawArrayBuffer(spillThreshold, 100 * 1000, 1 << 10) | ||
| testAgainstRawArrayBuffer(spillThreshold, 1000, 1 << 18) | ||
| testAgainstRawArrayBuffer(spillThreshold, 30 * 1000, 1 << 14) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| } | ||
|
|
||
| runBenchmark("WITH SPILL") { | ||
| testAgainstRawUnsafeExternalSorter(100 * 1000, 1000, 1 << 18) | ||
| testAgainstRawUnsafeExternalSorter( | ||
| config.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get, 10 * 1000, 1 << 4) | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you run this once more in your side? For me, I've got the followings. The ratio difference is too big.
Mac
EC2 Server
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The is the only difference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm rerunning it soon
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got the same ratio as you have this time:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for confirmation. Master branch seems to be changed.