diff --git a/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt b/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt new file mode 100644 index 0000000000000..747aae09272f9 --- /dev/null +++ b/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt @@ -0,0 +1,66 @@ +OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1044-aws +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +200000 MapOutputs, 10 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Serialization 609 631 22 0.3 3043.8 1.0X +Deserialization 840 897 67 0.2 4201.2 0.7X + +Compressed Serialized MapStatus sizes: 393 bytes +Compressed Serialized Broadcast MapStatus sizes: 3 MB + + +OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1044-aws +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +200000 MapOutputs, 10 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Serialization 591 599 8 0.3 2955.3 1.0X +Deserialization 878 913 31 0.2 4392.2 0.7X + +Compressed Serialized MapStatus sizes: 3 MB +Compressed Serialized Broadcast MapStatus sizes: 0 bytes + + +OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1044-aws +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +200000 MapOutputs, 100 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Serialization 1776 1778 2 0.1 8880.5 1.0X +Deserialization 1086 1086 0 0.2 5427.9 1.6X + +Compressed Serialized MapStatus sizes: 411 bytes +Compressed Serialized Broadcast MapStatus sizes: 15 MB + + +OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1044-aws +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +200000 MapOutputs, 100 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Serialization 1725 1726 1 0.1 8624.9 1.0X +Deserialization 1093 1094 2 0.2 5463.6 1.6X + +Compressed Serialized MapStatus sizes: 15 MB +Compressed Serialized Broadcast MapStatus sizes: 0 bytes + + +OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1044-aws +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +200000 MapOutputs, 1000 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Serialization 12421 12522 142 0.0 62104.4 1.0X +Deserialization 3020 3043 32 0.1 15102.0 4.1X + +Compressed Serialized MapStatus sizes: 544 bytes +Compressed Serialized Broadcast MapStatus sizes: 131 MB + + +OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1044-aws +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +200000 MapOutputs, 1000 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Serialization 11719 11737 26 0.0 58595.3 1.0X +Deserialization 3018 3051 46 0.1 15091.7 3.9X + +Compressed Serialized MapStatus sizes: 131 MB +Compressed Serialized Broadcast MapStatus sizes: 0 bytes + + diff --git a/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt b/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt new file mode 100644 index 0000000000000..1f479a49d5860 --- /dev/null +++ b/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt @@ -0,0 +1,66 @@ +OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.0-1044-aws +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +200000 MapOutputs, 10 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Serialization 625 639 9 0.3 3127.2 1.0X +Deserialization 875 931 49 0.2 4376.2 0.7X + +Compressed Serialized MapStatus sizes: 393 bytes +Compressed Serialized Broadcast MapStatus sizes: 3 MB + + +OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.0-1044-aws +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +200000 MapOutputs, 10 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Serialization 604 640 71 0.3 3018.4 1.0X +Deserialization 889 903 17 0.2 4443.8 0.7X + +Compressed Serialized MapStatus sizes: 3 MB +Compressed Serialized Broadcast MapStatus sizes: 0 bytes + + +OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.0-1044-aws +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +200000 MapOutputs, 100 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Serialization 1879 1880 2 0.1 9394.9 1.0X +Deserialization 1147 1150 5 0.2 5733.8 1.6X + +Compressed Serialized MapStatus sizes: 411 bytes +Compressed Serialized Broadcast MapStatus sizes: 15 MB + + +OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.0-1044-aws +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +200000 MapOutputs, 100 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Serialization 1825 1826 1 0.1 9123.3 1.0X +Deserialization 1147 1281 189 0.2 5735.7 1.6X + +Compressed Serialized MapStatus sizes: 15 MB +Compressed Serialized Broadcast MapStatus sizes: 0 bytes + + +OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.0-1044-aws +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +200000 MapOutputs, 1000 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Serialization 12327 12518 270 0.0 61634.3 1.0X +Deserialization 3120 3133 18 0.1 15600.8 4.0X + +Compressed Serialized MapStatus sizes: 544 bytes +Compressed Serialized Broadcast MapStatus sizes: 131 MB + + +OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.0-1044-aws +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +200000 MapOutputs, 1000 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Serialization 11928 11986 82 0.0 59642.2 1.0X +Deserialization 3137 3138 2 0.1 15683.3 3.8X + +Compressed Serialized MapStatus sizes: 131 MB +Compressed Serialized Broadcast MapStatus sizes: 0 bytes + + diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 24da855633db3..c181fac8b4d8e 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -371,8 +371,8 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging */ private[spark] class MapOutputTrackerMaster( conf: SparkConf, - broadcastManager: BroadcastManager, - isLocal: Boolean) + private[spark] val broadcastManager: BroadcastManager, + private[spark] val isLocal: Boolean) extends MapOutputTracker(conf) { // The size at which we use Broadcast to send the map output statuses to the executors diff --git a/core/src/test/scala/org/apache/spark/MapStatusesSerDeserBenchmark.scala b/core/src/test/scala/org/apache/spark/MapStatusesSerDeserBenchmark.scala new file mode 100644 index 0000000000000..53afe141981f4 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/MapStatusesSerDeserBenchmark.scala @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import org.apache.spark.benchmark.Benchmark +import org.apache.spark.benchmark.BenchmarkBase +import org.apache.spark.scheduler.CompressedMapStatus +import org.apache.spark.storage.BlockManagerId + +/** + * Benchmark for MapStatuses serialization & deserialization performance. + * {{{ + * To run this benchmark: + * 1. without sbt: bin/spark-submit --class --jars + * 2. build/sbt "core/test:runMain " + * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/test:runMain " + * Results will be written to "benchmarks/MapStatusesSerDeserBenchmark-results.txt". + * }}} + */ +object MapStatusesSerDeserBenchmark extends BenchmarkBase { + + var sc: SparkContext = null + var tracker: MapOutputTrackerMaster = null + + def serDeserBenchmark(numMaps: Int, blockSize: Int, enableBroadcast: Boolean): Unit = { + val minBroadcastSize = if (enableBroadcast) { + 0 + } else { + Int.MaxValue + } + + val benchmark = new Benchmark(s"$numMaps MapOutputs, $blockSize blocks " + { + if (enableBroadcast) "w/ " else "w/o " + } + "broadcast", numMaps, output = output) + + val shuffleId = 10 + + tracker.registerShuffle(shuffleId, numMaps) + val r = new scala.util.Random(912) + (0 until numMaps).foreach { i => + tracker.registerMapOutput(shuffleId, i, + new CompressedMapStatus(BlockManagerId(s"node$i", s"node$i.spark.apache.org", 1000), + Array.fill(blockSize) { + // Creating block size ranging from 0byte to 1GB + (r.nextDouble() * 1024 * 1024 * 1024).toLong + }, i)) + } + + val shuffleStatus = tracker.shuffleStatuses.get(shuffleId).head + + var serializedMapStatusSizes = 0 + var serializedBroadcastSizes = 0 + + val (serializedMapStatus, serializedBroadcast) = MapOutputTracker.serializeMapStatuses( + shuffleStatus.mapStatuses, tracker.broadcastManager, tracker.isLocal, minBroadcastSize) + serializedMapStatusSizes = serializedMapStatus.length + if (serializedBroadcast != null) { + serializedBroadcastSizes = serializedBroadcast.value.length + } + + benchmark.addCase("Serialization") { _ => + MapOutputTracker.serializeMapStatuses( + shuffleStatus.mapStatuses, tracker.broadcastManager, tracker.isLocal, minBroadcastSize) + } + + benchmark.addCase("Deserialization") { _ => + val result = MapOutputTracker.deserializeMapStatuses(serializedMapStatus) + assert(result.length == numMaps) + } + + benchmark.run() + // scalastyle:off + import org.apache.commons.io.FileUtils + benchmark.out.println("Compressed Serialized MapStatus sizes: " + + FileUtils.byteCountToDisplaySize(serializedMapStatusSizes)) + benchmark.out.println("Compressed Serialized Broadcast MapStatus sizes: " + + FileUtils.byteCountToDisplaySize(serializedBroadcastSizes) + "\n\n") + // scalastyle:on + + tracker.unregisterShuffle(shuffleId) + } + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + createSparkContext() + tracker = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] + val rpcEnv = sc.env.rpcEnv + val masterEndpoint = new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, sc.getConf) + rpcEnv.stop(tracker.trackerEndpoint) + rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint) + + serDeserBenchmark(200000, 10, true) + serDeserBenchmark(200000, 10, false) + + serDeserBenchmark(200000, 100, true) + serDeserBenchmark(200000, 100, false) + + serDeserBenchmark(200000, 1000, true) + serDeserBenchmark(200000, 1000, false) + } + + def createSparkContext(): Unit = { + val conf = new SparkConf() + if (sc != null) { + sc.stop() + } + sc = new SparkContext("local", "MapStatusesSerializationBenchmark", conf) + } + + override def afterAll(): Unit = { + tracker.stop() + if (sc != null) { + sc.stop() + } + } +}