Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,63 @@ case class Exchange(

override def output: Seq[Attribute] = child.output

/** We must copy rows when sort based shuffle is on */
protected def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
private val sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]

private val bypassMergeThreshold =
child.sqlContext.sparkContext.conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)

private val serializeMapOutputs =
child.sqlContext.sparkContext.conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true)

/**
* Determines whether records must be defensively copied before being sent to the shuffle.
* Several of Spark's shuffle components will buffer deserialized Java objects in memory. The
* shuffle code assumes that objects are immutable and hence does not perform its own defensive
* copying. In Spark SQL, however, operators' iterators return the same mutable `Row` object. In
* order to properly shuffle the output of these operators, we need to perform our own copying
* prior to sending records to the shuffle. This copying is expensive, so we try to avoid it
* whenever possible. This method encapsulates the logic for choosing when to copy.
*
* In the long run, we might want to push this logic into core's shuffle APIs so that we don't
* have to rely on knowledge of core internals here in SQL.
*
* See SPARK-2967, SPARK-4479, and SPARK-7375 for more discussion of this issue.
*
* @param numPartitions the number of output partitions produced by the shuffle
* @param serializer the serializer that will be used to write rows
* @return true if rows should be copied before being shuffled, false otherwise
*/
private def needToCopyObjectsBeforeShuffle(
numPartitions: Int,
serializer: Serializer): Boolean = {
if (newOrdering.nonEmpty) {
// If a new ordering is required, then records will be sorted with Spark's `ExternalSorter`,
// which requires a defensive copy.
true
} else if (sortBasedShuffleOn) {
// Spark's sort-based shuffle also uses `ExternalSorter` to buffer records in memory.
// However, there are two special cases where we can avoid the copy, described below:
if (numPartitions <= bypassMergeThreshold) {
// If the number of output partitions is sufficiently small, then Spark will fall back to
// the old hash-based shuffle write path which doesn't buffer deserialized records.
// Note that we'll have to remove this case if we fix SPARK-6026 and remove this bypass.
false
} else if (serializeMapOutputs && serializer.supportsRelocationOfSerializedObjects) {
// SPARK-4550 extended sort-based shuffle to serialize individual records prior to sorting
// them. This optimization is guarded by a feature-flag and is only applied in cases where
// shuffle dependency does not specify an ordering and the record serializer has certain
// properties. If this optimization is enabled, we can safely avoid the copy.
false
} else {
// None of the special cases held, so we must copy.
true
}
} else {
// We're using hash-based shuffle, so we don't need to copy.
false
}
}

private val keyOrdering = {
if (newOrdering.nonEmpty) {
val key = newPartitioning.keyExpressions
Expand All @@ -81,7 +132,7 @@ case class Exchange(

@transient private lazy val sparkConf = child.sqlContext.sparkContext.getConf

def serializer(
private def getSerializer(
keySchema: Array[DataType],
valueSchema: Array[DataType],
numPartitions: Int): Serializer = {
Expand Down Expand Up @@ -123,17 +174,11 @@ case class Exchange(
override def execute(): RDD[Row] = attachTree(this , "execute") {
newPartitioning match {
case HashPartitioning(expressions, numPartitions) =>
// TODO: Eliminate redundant expressions in grouping key and value.
// This is a workaround for SPARK-4479. When:
// 1. sort based shuffle is on, and
// 2. the partition number is under the merge threshold, and
// 3. no ordering is required
// we can avoid the defensive copies to improve performance. In the long run, we probably
// want to include information in shuffle dependencies to indicate whether elements in the
// source RDD should be copied.
val willMergeSort = sortBasedShuffleOn && numPartitions > bypassMergeThreshold

val rdd = if (willMergeSort || newOrdering.nonEmpty) {
val keySchema = expressions.map(_.dataType).toArray
val valueSchema = child.output.map(_.dataType).toArray
val serializer = getSerializer(keySchema, valueSchema, numPartitions)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the serializer creation calls a bit earlier here since we technically need to know whether the serializer supports object relocation in order to figure out whether spark.shuffle.sort.serializeMapOutputs will actually take effect. In theory, we might be able to avoid this check because SQL only uses SqlSerializer and SqlSerializer2, both of which support relocation. However, SqlSerializer extends KryoSerializer and there's a rare corner-case where user-provided Kryo registrators can change Kryo settings in a way that break's Kryo's relocatibility, causing the serialization to be bypassed. As a result, I think it's safer to create the serializer earlier and perform the full safety check.


val rdd = if (needToCopyObjectsBeforeShuffle(numPartitions, serializer)) {
child.execute().mapPartitions { iter =>
val hashExpressions = newMutableProjection(expressions, child.output)()
iter.map(r => (hashExpressions(r).copy(), r.copy()))
Expand All @@ -152,14 +197,14 @@ case class Exchange(
} else {
new ShuffledRDD[Row, Row, Row](rdd, part)
}
val keySchema = expressions.map(_.dataType).toArray
val valueSchema = child.output.map(_.dataType).toArray
shuffled.setSerializer(serializer(keySchema, valueSchema, numPartitions))

shuffled.setSerializer(serializer)
shuffled.map(_._2)

case RangePartitioning(sortingExpressions, numPartitions) =>
val rdd = if (sortBasedShuffleOn || newOrdering.nonEmpty) {
val keySchema = child.output.map(_.dataType).toArray
val serializer = getSerializer(keySchema, null, numPartitions)

val rdd = if (needToCopyObjectsBeforeShuffle(numPartitions, serializer)) {
child.execute().mapPartitions { iter => iter.map(row => (row.copy(), null))}
} else {
child.execute().mapPartitions { iter =>
Expand All @@ -178,17 +223,14 @@ case class Exchange(
} else {
new ShuffledRDD[Row, Null, Null](rdd, part)
}
val keySchema = child.output.map(_.dataType).toArray
shuffled.setSerializer(serializer(keySchema, null, numPartitions))

shuffled.setSerializer(serializer)
shuffled.map(_._1)

case SinglePartition =>
// SPARK-4479: Can't turn off defensive copy as what we do for `HashPartitioning`, since
// operators like `TakeOrdered` may require an ordering within the partition, and currently
// `SinglePartition` doesn't include ordering information.
// TODO Add `SingleOrderedPartition` for operators like `TakeOrdered`
val rdd = if (sortBasedShuffleOn) {
val valueSchema = child.output.map(_.dataType).toArray
val serializer = getSerializer(null, valueSchema, 1)

val rdd = if (needToCopyObjectsBeforeShuffle(numPartitions = 1, serializer)) {
child.execute().mapPartitions { iter => iter.map(r => (null, r.copy())) }
} else {
child.execute().mapPartitions { iter =>
Expand All @@ -198,8 +240,7 @@ case class Exchange(
}
val partitioner = new HashPartitioner(1)
val shuffled = new ShuffledRDD[Null, Row, Row](rdd, partitioner)
val valueSchema = child.output.map(_.dataType).toArray
shuffled.setSerializer(serializer(null, valueSchema, 1))
shuffled.setSerializer(serializer)
shuffled.map(_._2)

case _ => sys.error(s"Exchange not implemented for $newPartitioning")
Expand Down