-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-7081] Faster sort-based shuffle path using binary processing cache-aware sort #5868
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
81d52c5
abf7bfe
57a4ea0
e900152
767d3ca
3db12de
4d2f5e1
8e3ec20
253f13e
9c6cf58
e267cee
e2d96ca
d3cc310
87e721b
0748458
026b497
1433b42
240864c
bfc12d3
b8a09fe
c2fca17
f17fa8f
8958584
595923a
5e100b2
2776aca
f156a8f
3490512
3aeaff7
7ee918e
69232fd
57f1ec0
f480fb2
133c8c9
4f70141
aaea17b
b674412
11feeb6
8a6fe52
cfe0ec4
e67f1ea
5e8cf75
1ce1300
b95e642
9883e30
722849b
7cd013b
9b7ebed
e8718dd
1929a74
01afc74
8f5061a
67d25ba
fd4bb9e
9d1ee7c
fcd9a3c
27b18b0
4a01c45
f780fb1
b57c17f
1ef56c7
b3b1924
0d4d199
ec6d626
ae538dc
ea4f85f
1e3ad52
39434f9
e1855e5
7c953f9
8531286
69d5899
d4e6d89
4f0b770
e58a6b4
e995d1a
56781a1
0ad34da
85da63f
fdcac08
2d4e4f4
57312c9
6276168
4a2c785
e3b8855
c2ce78e
d5779c6
5e189c6
df07699
de40b9d
4023fa4
51812a7
52a9981
d494ffe
7610f2f
ef0a86e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
…nsafeShuffle path is used.
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,7 +22,7 @@ import java.util | |
|
|
||
| import com.esotericsoftware.kryo.io.ByteBufferOutputStream | ||
|
|
||
| import org.apache.spark.{ShuffleDependency, SparkConf, SparkEnv, TaskContext} | ||
| import org.apache.spark._ | ||
| import org.apache.spark.executor.ShuffleWriteMetrics | ||
| import org.apache.spark.scheduler.MapStatus | ||
| import org.apache.spark.serializer.Serializer | ||
|
|
@@ -34,17 +34,31 @@ import org.apache.spark.unsafe.memory.{MemoryBlock, TaskMemoryManager} | |
| import org.apache.spark.unsafe.sort.UnsafeSorter | ||
| import org.apache.spark.unsafe.sort.UnsafeSorter.{KeyPointerAndPrefix, PrefixComparator, PrefixComputer, RecordComparator} | ||
|
|
||
| private[spark] class UnsafeShuffleHandle[K, V]( | ||
| private class UnsafeShuffleHandle[K, V]( | ||
| shuffleId: Int, | ||
| override val numMaps: Int, | ||
| override val dependency: ShuffleDependency[K, V, V]) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| extends BaseShuffleHandle(shuffleId, numMaps, dependency) { | ||
| require(UnsafeShuffleManager.canUseUnsafeShuffle(dependency)) | ||
| } | ||
|
|
||
| private[spark] object UnsafeShuffleManager { | ||
| private[spark] object UnsafeShuffleManager extends Logging { | ||
| def canUseUnsafeShuffle[K, V, C](dependency: ShuffleDependency[K, V, C]): Boolean = { | ||
| dependency.aggregator.isEmpty && dependency.keyOrdering.isEmpty | ||
| val shufId = dependency.shuffleId | ||
| val serializer = Serializer.getSerializer(dependency.serializer) | ||
| if (!serializer.supportsRelocationOfSerializedObjects) { | ||
| log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because the serializer, " + | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I propose to use
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I considered this, but I worry that this will result in extremely chatty logs because many operations won't be able to use this new shuffle yet. For example, this would trigger a warning whenever This is a tricky issue, especially as the number of special-case shuffle optimizations grows. It will be very easy for users to slightly change their programs in ways that trigger slower code paths (e.g. by switching from LZF to LZ4 compression). Conversely, this also creates the potential for small changes to result in huge secondary performance benefits in non-obvious ways: if a user were to switch from LZ4 to LZF, then the current code would hit a more efficient shuffle merge path and might exhibit huge speed-ups, but a user might misattribute this to LZF being faster / offering better compression in general, whereas it's really the optimized merge path that's activated by LZF's concatenatibility that is responsible for the speed up. This is a general issue that's probably worth exploring as part of a broader discussion of how to expose internal knowledge of performance optimizations back to end users. |
||
| s"${serializer.getClass.getName}, does not support object relocation") | ||
| false | ||
| } else if (dependency.aggregator.isDefined) { | ||
| log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because an aggregator is defined") | ||
| false | ||
| } else if (dependency.keyOrdering.isDefined) { | ||
| log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because a key ordering is defined") | ||
| false | ||
| } else { | ||
| log.debug(s"Can use UnsafeShuffle for shuffle $shufId") | ||
| true | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -73,15 +87,13 @@ private object PartitionerPrefixComparator extends PrefixComparator { | |
| } | ||
| } | ||
|
|
||
| private[spark] class UnsafeShuffleWriter[K, V]( | ||
| private class UnsafeShuffleWriter[K, V]( | ||
| shuffleBlockManager: IndexShuffleBlockManager, | ||
| handle: UnsafeShuffleHandle[K, V], | ||
| mapId: Int, | ||
| context: TaskContext) | ||
| extends ShuffleWriter[K, V] { | ||
|
|
||
| println("Construcing a new UnsafeShuffleWriter") | ||
|
|
||
| private[this] val memoryManager: TaskMemoryManager = context.taskMemoryManager() | ||
|
|
||
| private[this] val dep = handle.dependency | ||
|
|
@@ -158,7 +170,6 @@ private[spark] class UnsafeShuffleWriter[K, V]( | |
| memoryManager.encodePageNumberAndOffset(currentPage, currentPagePosition) | ||
| PlatformDependent.UNSAFE.putLong(currentPage.getBaseObject, currentPagePosition, partitionId) | ||
| currentPagePosition += 8 | ||
| println("The stored record length is " + serializedRecordSize) | ||
| PlatformDependent.UNSAFE.putLong( | ||
| currentPage.getBaseObject, currentPagePosition, serializedRecordSize) | ||
| currentPagePosition += 8 | ||
|
|
@@ -169,7 +180,6 @@ private[spark] class UnsafeShuffleWriter[K, V]( | |
| currentPagePosition, | ||
| serializedRecordSize) | ||
| currentPagePosition += serializedRecordSize | ||
| println("After writing record, current page position is " + currentPagePosition) | ||
| sorter.insertRecord(newRecordAddress) | ||
|
|
||
| // Reset for writing the next record | ||
|
|
@@ -195,8 +205,10 @@ private[spark] class UnsafeShuffleWriter[K, V]( | |
| // TODO: don't close and re-open file handles so often; this could be inefficient | ||
|
|
||
| def closePartition(): Unit = { | ||
| writer.commitAndClose() | ||
| partitionLengths(currentPartition) = writer.fileSegment().length | ||
| if (writer != null) { | ||
| writer.commitAndClose() | ||
| partitionLengths(currentPartition) = writer.fileSegment().length | ||
| } | ||
| } | ||
|
|
||
| def switchToPartition(newPartition: Int): Unit = { | ||
|
|
@@ -219,8 +231,6 @@ private[spark] class UnsafeShuffleWriter[K, V]( | |
| val baseObject = memoryManager.getPage(keyPointerAndPrefix.recordPointer) | ||
| val baseOffset = memoryManager.getOffsetInPage(keyPointerAndPrefix.recordPointer) | ||
| val recordLength: Int = PlatformDependent.UNSAFE.getLong(baseObject, baseOffset + 8).toInt | ||
| println("Base offset is " + baseOffset) | ||
| println("Record length is " + recordLength) | ||
| // TODO: need to have a way to figure out whether a serializer supports relocation of | ||
| // serialized objects or not. Sandy also ran into this in his patch (see | ||
| // https://github.com/apache/spark/pull/4450). If we're using Java serialization, we might | ||
|
|
@@ -244,12 +254,8 @@ private[spark] class UnsafeShuffleWriter[K, V]( | |
|
|
||
| /** Write a sequence of records to this task's output */ | ||
| override def write(records: Iterator[_ <: Product2[K, V]]): Unit = { | ||
| println("Opened writer!") | ||
|
|
||
| val sortedIterator = sortRecords(records) | ||
| val partitionLengths = writeSortedRecordsToFile(sortedIterator) | ||
|
|
||
| println("Partition lengths are " + partitionLengths.toSeq) | ||
| shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths) | ||
| mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) | ||
| } | ||
|
|
@@ -264,7 +270,6 @@ private[spark] class UnsafeShuffleWriter[K, V]( | |
|
|
||
| /** Close this writer, passing along whether the map completed */ | ||
| override def stop(success: Boolean): Option[MapStatus] = { | ||
| println("Stopping unsafeshufflewriter") | ||
| try { | ||
| if (stopping) { | ||
| None | ||
|
|
@@ -300,7 +305,6 @@ private[spark] class UnsafeShuffleManager(conf: SparkConf) extends ShuffleManage | |
| numMaps: Int, | ||
| dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { | ||
| if (UnsafeShuffleManager.canUseUnsafeShuffle(dependency)) { | ||
| println("Opening unsafeShuffleWriter") | ||
| new UnsafeShuffleHandle[K, V]( | ||
| shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) | ||
| } else { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -131,8 +131,7 @@ private[spark] class ExternalSorter[K, V, C]( | |
| private val kvChunkSize = conf.getInt("spark.shuffle.sort.kvChunkSize", 1 << 22) // 4 MB | ||
| private val useSerializedPairBuffer = | ||
| !ordering.isDefined && conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true) && | ||
| ser.isInstanceOf[KryoSerializer] && | ||
| serInstance.asInstanceOf[KryoSerializerInstance].getAutoReset | ||
| ser.supportsRelocationOfSerializedObjects | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @sryza, this change is intended to partially address https://issues.apache.org/jira/browse/SPARK-7311. |
||
|
|
||
| // Data structures to store in-memory objects before we spill. Depending on whether we have an | ||
| // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
override valis redundant