-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-7081] Faster sort-based shuffle path using binary processing cache-aware sort #5868
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
81d52c5
abf7bfe
57a4ea0
e900152
767d3ca
3db12de
4d2f5e1
8e3ec20
253f13e
9c6cf58
e267cee
e2d96ca
d3cc310
87e721b
0748458
026b497
1433b42
240864c
bfc12d3
b8a09fe
c2fca17
f17fa8f
8958584
595923a
5e100b2
2776aca
f156a8f
3490512
3aeaff7
7ee918e
69232fd
57f1ec0
f480fb2
133c8c9
4f70141
aaea17b
b674412
11feeb6
8a6fe52
cfe0ec4
e67f1ea
5e8cf75
1ce1300
b95e642
9883e30
722849b
7cd013b
9b7ebed
e8718dd
1929a74
01afc74
8f5061a
67d25ba
fd4bb9e
9d1ee7c
fcd9a3c
27b18b0
4a01c45
f780fb1
b57c17f
1ef56c7
b3b1924
0d4d199
ec6d626
ae538dc
ea4f85f
1e3ad52
39434f9
e1855e5
7c953f9
8531286
69d5899
d4e6d89
4f0b770
e58a6b4
e995d1a
56781a1
0ad34da
85da63f
fdcac08
2d4e4f4
57312c9
6276168
4a2c785
e3b8855
c2ce78e
d5779c6
5e189c6
df07699
de40b9d
4023fa4
51812a7
52a9981
d494ffe
7610f2f
ef0a86e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,9 @@ import org.apache.spark.serializer.Serializer | |
| import org.apache.spark.shuffle._ | ||
| import org.apache.spark.shuffle.sort.SortShuffleManager | ||
|
|
||
| /** | ||
| * Subclass of [[BaseShuffleHandle]], used to identify when we've chosen to use the new shuffle. | ||
| */ | ||
| private class UnsafeShuffleHandle[K, V]( | ||
| shuffleId: Int, | ||
| override val numMaps: Int, | ||
|
|
@@ -30,6 +33,10 @@ private class UnsafeShuffleHandle[K, V]( | |
| } | ||
|
|
||
| private[spark] object UnsafeShuffleManager extends Logging { | ||
| /** | ||
| * Helper method for determining whether a shuffle should use the optimized unsafe shuffle | ||
| * path or whether it should fall back to the original sort-based shuffle. | ||
| */ | ||
| def canUseUnsafeShuffle[K, V, C](dependency: ShuffleDependency[K, V, C]): Boolean = { | ||
| val shufId = dependency.shuffleId | ||
| val serializer = Serializer.getSerializer(dependency.serializer) | ||
|
|
@@ -43,13 +50,63 @@ private[spark] object UnsafeShuffleManager extends Logging { | |
| } else if (dependency.keyOrdering.isDefined) { | ||
| log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because a key ordering is defined") | ||
| false | ||
| } else if (dependency.partitioner.numPartitions > PackedRecordPointer.MAXIMUM_PARTITION_ID) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here should be updated to MAXIMUM_PARTITION_ID + 1 |
||
| log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because it has more than " + | ||
| s"${PackedRecordPointer.MAXIMUM_PARTITION_ID} partitions") | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here should be updated to MAXIMUM_PARTITION_ID + 1 |
||
| false | ||
| } else { | ||
| log.debug(s"Can use UnsafeShuffle for shuffle $shufId") | ||
| true | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * A shuffle implementation that uses directly-managed memory to implement several performance | ||
| * optimizations for certain types of shuffles. In cases where the new performance optimizations | ||
| * cannot be applied, this shuffle manager delegates to [[SortShuffleManager]] to handle those | ||
| * shuffles. | ||
| * | ||
| * UnsafeShuffleManager's optimizations will apply when _all_ of the following conditions hold: | ||
| * | ||
| * - The shuffle dependency specifies no aggregation or output ordering. | ||
| * - The shuffle serializer supports relocation of serialized values (this is currently supported | ||
| * by KryoSerializer and Spark SQL's custom serializers). | ||
| * - The shuffle produces fewer than 16777216 output partitions. | ||
| * - No individual record is larger than 128 MB when serialized. | ||
| * | ||
| * In addition, extra spill-merging optimizations are automatically applied when the shuffle | ||
| * compression codec supports concatenation of serialized streams. This is currently supported by | ||
| * Spark's LZF serializer. | ||
| * | ||
| * At a high-level, UnsafeShuffleManager's design is similar to Spark's existing SortShuffleManager. | ||
| * In sort-based shuffle, incoming records are sorted according to their target partition ids, then | ||
| * written to a single map output file. Reducers fetch contiguous regions of this file in order to | ||
| * read their portion of the map output. In cases where the map output data is too large to fit in | ||
| * memory, sorted subsets of the output can are spilled to disk and those on-disk files are merged | ||
| * to produce the final output file. | ||
| * | ||
| * UnsafeShuffleManager optimizes this process in several ways: | ||
| * | ||
| * - Its sort operates on serialized binary data rather than Java objects, which reduces memory | ||
| * consumption and GC overheads. This optimization requires the record serializer to have certain | ||
| * properties to allow serialized records to be re-ordered without requiring deserialization. | ||
| * See SPARK-4550, where this optimization was first proposed and implemented, for more details. | ||
| * | ||
| * - It uses a specialized cache-efficient sorter ([[UnsafeShuffleExternalSorter]]) that sorts | ||
| * arrays of compressed record pointers and partition ids. By using only 8 bytes of space per | ||
| * record in the sorting array, this fits more of the array into cache. | ||
| * | ||
| * - The spill merging procedure operates on blocks of serialized records that belong to the same | ||
| * partition and does not need to deserialize records during the merge. | ||
| * | ||
| * - When the spill compression codec supports concatenation of compressed data, the spill merge | ||
| * simply concatenates the serialized and compressed spill partitions to produce the final output | ||
| * partition. This allows efficient data copying methods, like NIO's `transferTo`, to be used | ||
| * and avoids the need to allocate decompression or copying buffers during the merge. | ||
| * | ||
| * For more details on UnsafeShuffleManager's design, see SPARK-7081. | ||
| */ | ||
| private[spark] class UnsafeShuffleManager(conf: SparkConf) extends ShuffleManager { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is fine for wip - but for the final version let's make sure we have sufficient high level documentation in the form of javadoc
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep, this is the next piece that I'm writing docs for :) |
||
|
|
||
| private[this] val sortShuffleManager: SortShuffleManager = new SortShuffleManager(conf) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,128 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.shuffle.unsafe | ||
|
|
||
| import org.mockito.Mockito._ | ||
| import org.mockito.invocation.InvocationOnMock | ||
| import org.mockito.stubbing.Answer | ||
| import org.scalatest.{FunSuite, Matchers} | ||
|
|
||
| import org.apache.spark._ | ||
| import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, Serializer} | ||
|
|
||
| /** | ||
| * Tests for the fallback logic in UnsafeShuffleManager. Actual tests of shuffling data are | ||
| * performed in other suites. | ||
| */ | ||
| class UnsafeShuffleManagerSuite extends FunSuite with Matchers { | ||
|
|
||
| import UnsafeShuffleManager.canUseUnsafeShuffle | ||
|
|
||
| private class RuntimeExceptionAnswer extends Answer[Object] { | ||
| override def answer(invocation: InvocationOnMock): Object = { | ||
| throw new RuntimeException("Called non-stubbed method, " + invocation.getMethod.getName) | ||
| } | ||
| } | ||
|
|
||
| private def shuffleDep( | ||
| partitioner: Partitioner, | ||
| serializer: Option[Serializer], | ||
| keyOrdering: Option[Ordering[Any]], | ||
| aggregator: Option[Aggregator[Any, Any, Any]], | ||
| mapSideCombine: Boolean): ShuffleDependency[Any, Any, Any] = { | ||
| val dep = mock(classOf[ShuffleDependency[Any, Any, Any]], new RuntimeExceptionAnswer()) | ||
| doReturn(0).when(dep).shuffleId | ||
| doReturn(partitioner).when(dep).partitioner | ||
| doReturn(serializer).when(dep).serializer | ||
| doReturn(keyOrdering).when(dep).keyOrdering | ||
| doReturn(aggregator).when(dep).aggregator | ||
| doReturn(mapSideCombine).when(dep).mapSideCombine | ||
| dep | ||
| } | ||
|
|
||
| test("supported shuffle dependencies") { | ||
| val kryo = Some(new KryoSerializer(new SparkConf())) | ||
|
|
||
| assert(canUseUnsafeShuffle(shuffleDep( | ||
| partitioner = new HashPartitioner(2), | ||
| serializer = kryo, | ||
| keyOrdering = None, | ||
| aggregator = None, | ||
| mapSideCombine = false | ||
| ))) | ||
|
|
||
| val rangePartitioner = mock(classOf[RangePartitioner[Any, Any]]) | ||
| when(rangePartitioner.numPartitions).thenReturn(2) | ||
| assert(canUseUnsafeShuffle(shuffleDep( | ||
| partitioner = rangePartitioner, | ||
| serializer = kryo, | ||
| keyOrdering = None, | ||
| aggregator = None, | ||
| mapSideCombine = false | ||
| ))) | ||
|
|
||
| } | ||
|
|
||
| test("unsupported shuffle dependencies") { | ||
| val kryo = Some(new KryoSerializer(new SparkConf())) | ||
| val java = Some(new JavaSerializer(new SparkConf())) | ||
|
|
||
| // We only support serializers that support object relocation | ||
| assert(!canUseUnsafeShuffle(shuffleDep( | ||
| partitioner = new HashPartitioner(2), | ||
| serializer = java, | ||
| keyOrdering = None, | ||
| aggregator = None, | ||
| mapSideCombine = false | ||
| ))) | ||
|
|
||
| // We do not support shuffles with more than 16 million output partitions | ||
| assert(!canUseUnsafeShuffle(shuffleDep( | ||
| partitioner = new HashPartitioner(PackedRecordPointer.MAXIMUM_PARTITION_ID + 1), | ||
| serializer = kryo, | ||
| keyOrdering = None, | ||
| aggregator = None, | ||
| mapSideCombine = false | ||
| ))) | ||
|
|
||
| // We do not support shuffles that perform any kind of aggregation or sorting of keys | ||
| assert(!canUseUnsafeShuffle(shuffleDep( | ||
| partitioner = new HashPartitioner(2), | ||
| serializer = kryo, | ||
| keyOrdering = Some(mock(classOf[Ordering[Any]])), | ||
| aggregator = None, | ||
| mapSideCombine = false | ||
| ))) | ||
| assert(!canUseUnsafeShuffle(shuffleDep( | ||
| partitioner = new HashPartitioner(2), | ||
| serializer = kryo, | ||
| keyOrdering = None, | ||
| aggregator = Some(mock(classOf[Aggregator[Any, Any, Any]])), | ||
| mapSideCombine = false | ||
| ))) | ||
| // We do not support shuffles that perform any kind of aggregation or sorting of keys | ||
| assert(!canUseUnsafeShuffle(shuffleDep( | ||
| partitioner = new HashPartitioner(2), | ||
| serializer = kryo, | ||
| keyOrdering = Some(mock(classOf[Ordering[Any]])), | ||
| aggregator = Some(mock(classOf[Aggregator[Any, Any, Any]])), | ||
| mapSideCombine = true | ||
| ))) | ||
| } | ||
|
|
||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
override valis redundant