From ad006a4f2e147ba70201d2fd40a82ede887e510e Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 6 May 2015 13:01:20 -0700 Subject: [PATCH 1/3] [SPARK-7375] Avoid defensive copying in exchange operator when sort.serializeMapOutputs takes effect. --- .../apache/spark/sql/execution/Exchange.scala | 99 +++++++++++++------ 1 file changed, 70 insertions(+), 29 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 5b2e46962cd3..9aacdc99e9be 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -59,12 +59,63 @@ case class Exchange( override def output: Seq[Attribute] = child.output - /** We must copy rows when sort based shuffle is on */ - protected def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager] + private val sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager] private val bypassMergeThreshold = child.sqlContext.sparkContext.conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200) + private val serializeMapOutputs = + child.sqlContext.sparkContext.conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true) + + /** + * Determines whether records must be defensively copied before being sent to the shuffle. + * Several of Spark's shuffle components will buffer deserialized Java objects in memory. The + * shuffle code assumes that objects are immutable and hence does not perform its own defensive + * copying. In Spark SQL, however, operators' iterators return the same mutable `Row` object. In + * order to properly shuffle the output of these operators, we need to perform our own copying + * prior to sending records to the shuffle. This copying is expensive, so we try to avoid it + * whenever possible. This method encapsulates the logic for choosing when to copy. + * + * In the long run, we might want to push this logic into core's shuffle APIs so that we don't + * have to rely on knowledge of core internals here in SQL. + * + * See SPARK-2967, SPARK-4479, and SPARK-7375 for more discussion of this issue. + * + * @param numPartitions the number of output partitions produced by the shuffle + * @param serializer the serializer that will be used to write rows + * @return true if rows should be copied before being shuffled, false otherwise + */ + private def needToCopyObjectsBeforeShuffle( + numPartitions: Int, + serializer: Serializer): Boolean = { + if (newOrdering.nonEmpty) { + // If a new ordering is required, then records will be sorted with Spark's `ExternalSorter`, + // which requires a defensive copy. + true + } else if (sortBasedShuffleOn) { + // Spark's sort-based shuffle also uses `ExternalSorter` to buffer records in memory. + // However, there are two special cases where we can avoid the copy, described below: + if (numPartitions <= bypassMergeThreshold) { + // If the number of output partitions is sufficiently small, then Spark will fall back to + // the old hash-based shuffle write path which doesn't buffer deserialized records. + // Note that we'll have to remove this case if we fix SPARK-6026 and remove this bypass. + false + } else if (serializeMapOutputs && serializer.supportsRelocationOfSerializedObjects) { + // SPARK-4550 extended sort-based shuffle to serialize individual records prior to sorting + // them. This optimization is guarded by a feature-flag and is only applied in cases where + // shuffle dependency does not specify an ordering and the record serializer has certain + // properties. If this optimization is enabled, we can safely avoid the copy. + false + } else { + // None of the special cases held, so we must copy. + true + } + } else { + // We're using hash-based shuffle, so we don't need to copy. + false + } + } + private val keyOrdering = { if (newOrdering.nonEmpty) { val key = newPartitioning.keyExpressions @@ -81,7 +132,7 @@ case class Exchange( @transient private lazy val sparkConf = child.sqlContext.sparkContext.getConf - def serializer( + private def getSerializer( keySchema: Array[DataType], valueSchema: Array[DataType], numPartitions: Int): Serializer = { @@ -123,17 +174,11 @@ case class Exchange( override def execute(): RDD[Row] = attachTree(this , "execute") { newPartitioning match { case HashPartitioning(expressions, numPartitions) => - // TODO: Eliminate redundant expressions in grouping key and value. - // This is a workaround for SPARK-4479. When: - // 1. sort based shuffle is on, and - // 2. the partition number is under the merge threshold, and - // 3. no ordering is required - // we can avoid the defensive copies to improve performance. In the long run, we probably - // want to include information in shuffle dependencies to indicate whether elements in the - // source RDD should be copied. - val willMergeSort = sortBasedShuffleOn && numPartitions > bypassMergeThreshold - - val rdd = if (willMergeSort || newOrdering.nonEmpty) { + val keySchema = expressions.map(_.dataType).toArray + val valueSchema = child.output.map(_.dataType).toArray + val serializer = getSerializer(keySchema, valueSchema, numPartitions) + + val rdd = if (needToCopyObjectsBeforeShuffle(numPartitions, serializer)) { child.execute().mapPartitions { iter => val hashExpressions = newMutableProjection(expressions, child.output)() iter.map(r => (hashExpressions(r).copy(), r.copy())) @@ -152,14 +197,14 @@ case class Exchange( } else { new ShuffledRDD[Row, Row, Row](rdd, part) } - val keySchema = expressions.map(_.dataType).toArray - val valueSchema = child.output.map(_.dataType).toArray - shuffled.setSerializer(serializer(keySchema, valueSchema, numPartitions)) - + shuffled.setSerializer(serializer) shuffled.map(_._2) case RangePartitioning(sortingExpressions, numPartitions) => - val rdd = if (sortBasedShuffleOn || newOrdering.nonEmpty) { + val keySchema = child.output.map(_.dataType).toArray + val serializer = getSerializer(keySchema, null, numPartitions) + + val rdd = if (needToCopyObjectsBeforeShuffle(numPartitions, serializer)) { child.execute().mapPartitions { iter => iter.map(row => (row.copy(), null))} } else { child.execute().mapPartitions { iter => @@ -178,17 +223,14 @@ case class Exchange( } else { new ShuffledRDD[Row, Null, Null](rdd, part) } - val keySchema = child.output.map(_.dataType).toArray - shuffled.setSerializer(serializer(keySchema, null, numPartitions)) - + shuffled.setSerializer(serializer) shuffled.map(_._1) case SinglePartition => - // SPARK-4479: Can't turn off defensive copy as what we do for `HashPartitioning`, since - // operators like `TakeOrdered` may require an ordering within the partition, and currently - // `SinglePartition` doesn't include ordering information. - // TODO Add `SingleOrderedPartition` for operators like `TakeOrdered` - val rdd = if (sortBasedShuffleOn) { + val valueSchema = child.output.map(_.dataType).toArray + val serializer = getSerializer(null, valueSchema, 1) + + val rdd = if (needToCopyObjectsBeforeShuffle(numPartitions = 1, serializer)) { child.execute().mapPartitions { iter => iter.map(r => (null, r.copy())) } } else { child.execute().mapPartitions { iter => @@ -198,8 +240,7 @@ case class Exchange( } val partitioner = new HashPartitioner(1) val shuffled = new ShuffledRDD[Null, Row, Row](rdd, partitioner) - val valueSchema = child.output.map(_.dataType).toArray - shuffled.setSerializer(serializer(null, valueSchema, 1)) + shuffled.setSerializer(serializer) shuffled.map(_._2) case _ => sys.error(s"Exchange not implemented for $newPartitioning") From 6a6bfce0d3a1e259083f3ed111c97f1f27dd41ea Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 6 May 2015 14:25:46 -0700 Subject: [PATCH 2/3] Fix issue related to RangePartitioning: - We now defensively copy before computing the partition bounds, which is necessary in order to get accurate sampling. - We now pass the actual partitioner into needToCopyObjectsBeforeShuffle(), which guards against the fact that RangePartitioner may produce a shuffle with fewer than `numPartitions` partitions. --- .../apache/spark/sql/execution/Exchange.scala | 65 +++++++++++-------- 1 file changed, 37 insertions(+), 28 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 9aacdc99e9be..1f6f3d3e8210 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi import org.apache.spark.shuffle.sort.SortShuffleManager -import org.apache.spark.{SparkEnv, HashPartitioner, RangePartitioner} +import org.apache.spark.{HashPartitioner, Partitioner, RangePartitioner, SparkEnv} import org.apache.spark.rdd.{RDD, ShuffledRDD} import org.apache.spark.serializer.Serializer import org.apache.spark.sql.{SQLContext, Row} @@ -81,13 +81,17 @@ case class Exchange( * * See SPARK-2967, SPARK-4479, and SPARK-7375 for more discussion of this issue. * - * @param numPartitions the number of output partitions produced by the shuffle + * @param partitioner the partitioner for the shuffle * @param serializer the serializer that will be used to write rows * @return true if rows should be copied before being shuffled, false otherwise */ private def needToCopyObjectsBeforeShuffle( - numPartitions: Int, + partitioner: Partitioner, serializer: Serializer): Boolean = { + // Note: even though we only use the partitioner's `numPartitions` field, we require it to be + // passed instead of directly passing the number of partitions in order to guard against + // corner-cases where a partitioner constructed with `numPartitions` partitions may output + // fewer partitions (like RangeParittioner, for example). if (newOrdering.nonEmpty) { // If a new ordering is required, then records will be sorted with Spark's `ExternalSorter`, // which requires a defensive copy. @@ -95,7 +99,7 @@ case class Exchange( } else if (sortBasedShuffleOn) { // Spark's sort-based shuffle also uses `ExternalSorter` to buffer records in memory. // However, there are two special cases where we can avoid the copy, described below: - if (numPartitions <= bypassMergeThreshold) { + if (partitioner.numPartitions <= bypassMergeThreshold) { // If the number of output partitions is sufficiently small, then Spark will fall back to // the old hash-based shuffle write path which doesn't buffer deserialized records. // Note that we'll have to remove this case if we fix SPARK-6026 and remove this bypass. @@ -177,8 +181,9 @@ case class Exchange( val keySchema = expressions.map(_.dataType).toArray val valueSchema = child.output.map(_.dataType).toArray val serializer = getSerializer(keySchema, valueSchema, numPartitions) + val part = new HashPartitioner(numPartitions) - val rdd = if (needToCopyObjectsBeforeShuffle(numPartitions, serializer)) { + val rdd = if (needToCopyObjectsBeforeShuffle(part, serializer)) { child.execute().mapPartitions { iter => val hashExpressions = newMutableProjection(expressions, child.output)() iter.map(r => (hashExpressions(r).copy(), r.copy())) @@ -190,13 +195,10 @@ case class Exchange( iter.map(r => mutablePair.update(hashExpressions(r), r)) } } - val part = new HashPartitioner(numPartitions) - val shuffled = - if (newOrdering.nonEmpty) { - new ShuffledRDD[Row, Row, Row](rdd, part).setKeyOrdering(keyOrdering) - } else { - new ShuffledRDD[Row, Row, Row](rdd, part) - } + val shuffled = new ShuffledRDD[Row, Row, Row](rdd, part) + if (newOrdering.nonEmpty) { + shuffled.setKeyOrdering(keyOrdering) + } shuffled.setSerializer(serializer) shuffled.map(_._2) @@ -204,33 +206,41 @@ case class Exchange( val keySchema = child.output.map(_.dataType).toArray val serializer = getSerializer(keySchema, null, numPartitions) - val rdd = if (needToCopyObjectsBeforeShuffle(numPartitions, serializer)) { - child.execute().mapPartitions { iter => iter.map(row => (row.copy(), null))} + val childRdd = child.execute() + val part: Partitioner = { + // Internally, RangePartitioner runs a job on the RDD that samples keys to compute + // partition bounds. To get accurate samples, we need to copy the mutable keys. + val rddForSampling = childRdd.mapPartitions { iter => + val mutablePair = new MutablePair[Row, Null]() + iter.map(row => mutablePair.update(row.copy(), null)) + } + // TODO: RangePartitioner should take an Ordering. + implicit val ordering = new RowOrdering(sortingExpressions, child.output) + new RangePartitioner(numPartitions, rddForSampling, ascending = true) + } + + val rdd = if (needToCopyObjectsBeforeShuffle(part, serializer)) { + childRdd.mapPartitions { iter => iter.map(row => (row.copy(), null))} } else { - child.execute().mapPartitions { iter => - val mutablePair = new MutablePair[Row, Null](null, null) + childRdd.mapPartitions { iter => + val mutablePair = new MutablePair[Row, Null]() iter.map(row => mutablePair.update(row, null)) } } - // TODO: RangePartitioner should take an Ordering. - implicit val ordering = new RowOrdering(sortingExpressions, child.output) - - val part = new RangePartitioner(numPartitions, rdd, ascending = true) - val shuffled = - if (newOrdering.nonEmpty) { - new ShuffledRDD[Row, Null, Null](rdd, part).setKeyOrdering(keyOrdering) - } else { - new ShuffledRDD[Row, Null, Null](rdd, part) - } + val shuffled = new ShuffledRDD[Row, Null, Null](rdd, part) + if (newOrdering.nonEmpty) { + shuffled.setKeyOrdering(keyOrdering) + } shuffled.setSerializer(serializer) shuffled.map(_._1) case SinglePartition => val valueSchema = child.output.map(_.dataType).toArray val serializer = getSerializer(null, valueSchema, 1) + val partitioner = new HashPartitioner(1) - val rdd = if (needToCopyObjectsBeforeShuffle(numPartitions = 1, serializer)) { + val rdd = if (needToCopyObjectsBeforeShuffle(partitioner, serializer)) { child.execute().mapPartitions { iter => iter.map(r => (null, r.copy())) } } else { child.execute().mapPartitions { iter => @@ -238,7 +248,6 @@ case class Exchange( iter.map(r => mutablePair.update(null, r)) } } - val partitioner = new HashPartitioner(1) val shuffled = new ShuffledRDD[Null, Row, Row](rdd, partitioner) shuffled.setSerializer(serializer) shuffled.map(_._2) From f305ff3f498348b86bc7929378e32c5f36141d6d Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 8 May 2015 10:59:55 -0700 Subject: [PATCH 3/3] Reduce scope of some variables in Exchange --- .../org/apache/spark/sql/execution/Exchange.scala | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 0fb7f40285e1..a98246572c75 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -59,14 +59,6 @@ case class Exchange( override def output: Seq[Attribute] = child.output - private val sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager] - - private val bypassMergeThreshold = - child.sqlContext.sparkContext.conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200) - - private val serializeMapOutputs = - child.sqlContext.sparkContext.conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true) - /** * Determines whether records must be defensively copied before being sent to the shuffle. * Several of Spark's shuffle components will buffer deserialized Java objects in memory. The @@ -91,7 +83,11 @@ case class Exchange( // Note: even though we only use the partitioner's `numPartitions` field, we require it to be // passed instead of directly passing the number of partitions in order to guard against // corner-cases where a partitioner constructed with `numPartitions` partitions may output - // fewer partitions (like RangeParittioner, for example). + // fewer partitions (like RangePartitioner, for example). + val conf = child.sqlContext.sparkContext.conf + val sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager] + val bypassMergeThreshold = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200) + val serializeMapOutputs = conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true) if (newOrdering.nonEmpty) { // If a new ordering is required, then records will be sorted with Spark's `ExternalSorter`, // which requires a defensive copy.